wip: good progress toward new multi-key index for cache

This commit is contained in:
Greg Burd 2017-10-18 12:17:00 -04:00
parent 32b06e1494
commit f64d5edd7c
20 changed files with 633 additions and 505 deletions

View file

@ -64,7 +64,7 @@ dependencies {
compile group: 'org.aspectj', name: 'aspectjweaver', version: '1.8.10'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.6'
compile group: 'org.springframework', name: 'spring-core', version: '4.3.10.RELEASE'
compile group: 'org.ahocorasick', name: 'ahocorasick', version: '0.4.0'
compile group: 'com.google.guava', name: 'guava', version: '20.0'
compile group: 'com.diffplug.durian', name: 'durian', version: '3.+'
compile group: 'io.zipkin.java', name: 'zipkin', version: '1.29.2'

View file

@ -35,6 +35,7 @@
<orderEntry type="library" name="Maven: org.springframework:spring-core:4.3.10.RELEASE" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:20.0" level="project" />
<orderEntry type="library" name="Maven: org.ahocorasick:ahocorasick:0.4.0" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.java:zipkin:1.29.2" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.brave:brave:4.0.6" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.reporter:zipkin-reporter:0.6.12" level="project" />

View file

@ -148,6 +148,12 @@
<version>20.0</version>
</dependency>
<dependency>
<groupId>org.ahocorasick</groupId>
<artifactId>ahocorasick</artifactId>
<version>0.4.0</version>
</dependency>
<!-- Metrics and tracing -->
<dependency>
<groupId>io.zipkin.java</groupId>

View file

@ -17,15 +17,22 @@ package net.helenus.core;
import com.diffplug.common.base.Errors;
import com.google.common.collect.TreeTraverser;
import net.helenus.core.cache.EntityIdentifyingFacet;
import net.helenus.support.Either;
import org.ahocorasick.trie.Emit;
import org.ahocorasick.trie.Trie;
import java.util.*;
import java.util.stream.Collectors;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfWork, AutoCloseable {
public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfWork<E>, AutoCloseable {
private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>();
private final HelenusSession session;
private final AbstractUnitOfWork<E> parent;
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
private final Map<String, Set<Object>> cache = new HashMap<String, Set<Object>>();
private final Map<String, Either<Object, Set<Object>>> cache = new HashMap<String, Either<Object, Set<Object>>>();
private Trie cacheIndex = Trie.builder().ignoreOverlaps().onlyWholeWordsWhiteSpaceSeparated().build();
private boolean aborted = false;
private boolean committed = false;
@ -36,14 +43,15 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
this.parent = parent;
}
public UnitOfWork addNestedUnitOfWork(UnitOfWork uow) {
@Override
public void addNestedUnitOfWork(UnitOfWork<E> uow) {
synchronized (nested) {
nested.add((AbstractUnitOfWork<E>) uow);
}
return this;
}
public UnitOfWork begin() {
@Override
public UnitOfWork<E> begin() {
// log.record(txn::start)
return this;
}
@ -56,20 +64,49 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
}
}
public Set<Object> cacheLookup(String key) {
Set<Object> r = getCache().get(key);
if (r != null) {
return r;
} else {
if (parent != null) {
return parent.cacheLookup(key);
@Override
public Optional<Either<Object, Set<Object>>> cacheLookupByFacet(Set<EntityIdentifyingFacet> facets) {
Optional<Either<Object, Set<Object>>> result = null;
Collection<Emit> emits = cacheIndex.parseText(String.join(" ", facets.stream()
.map(facet -> facet.toString()).collect(Collectors.toList())));
for (Emit emit : emits) {
// NOTE: rethink. should this match *all* facets? how do I know which emit keyword is the primary key?
String key = emit.getKeyword();
result = cacheLookup(key);
}
}
return null;
return result;
}
public Map<String, Set<Object>> getCache() {
return cache;
@Override
public Optional<Either<Object, Set<Object>>> cacheLookupByStatement(String[] statementKeys) {
Optional<Either<Object, Set<Object>>> result = null;
String key = String.join(",", statementKeys);
return cacheLookup(key);
}
@Override
public Optional<Either<Object, Set<Object>>> cacheLookup(String key) {
Optional<Either<Object, Set<Object>>> result = Optional.of(cache.get(key));
if (result.isPresent()) {
return result;
} else {
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
if (parent != null) {
return parent.cacheLookup(key);
}
}
return Optional.empty();
}
@Override
public void cacheUpdate(Either<Object, Set<Object>> value, String[] statementKeys, Set<EntityIdentifyingFacet> facets) {
String key = String.join(",", statementKeys);
cache.put(key, value);
Trie.TrieBuilder builder = cacheIndex.builder();
facets.forEach(facet -> {
builder.addKeyword(facet.toString());
});
}
private Iterator<AbstractUnitOfWork<E>> getChildNodes() {
@ -108,18 +145,7 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
// Merge UOW cache into parent's cache.
if (parent != null) {
Map<String, Set<Object>> parentCache = parent.getCache();
for (String key : cache.keySet()) {
if (parentCache.containsKey(key)) {
// merge the sets
Set<Object> ps = parentCache.get(key);
ps.addAll(
cache.get(key)); //TODO(gburd): review this, likely not correct in all cases as-is.
} else {
// add the missing set
parentCache.put(key, cache.get(key));
}
}
parent.assumeCache(cache, cacheIndex);
}
// Apply all post-commit functions for
@ -155,6 +181,37 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
// cache.invalidateSince(txn::start time)
}
private void assumeCache(Map<String, Either<Object, Set<Object>>> childCache, Trie childCacheIndex) {
for (String key : childCache.keySet()) {
if (cache.containsKey(key)) {
Either<Object, Set<Object>> value = cache.get(key);
if (value.isLeft()) {
Object obj = value.getLeft();
// merge objects
Either<Object, Set<Object>> childValue = childCache.get(key);
if (childValue.isLeft()) {
Object childObj = childValue.getLeft();
} else {
Set<Object> childSet = childValue.getRight();
}
} else {
// merge the sets
Set<Object> set = value.getRight();
Either<Object, Set<Object>> childValue = childCache.get(key);
if (childValue.isLeft()) {
Object childObj = childValue.getLeft();
set.add(childObj);
} else {
Set<Object> childSet = childValue.getRight();
set.addAll(childSet);
}
}
} else {
cache.put(key, childCache.get(key));
}
}
}
public String describeConflicts() {
return "it's complex...";
}

View file

@ -15,10 +15,13 @@
*/
package net.helenus.core;
import java.util.Map;
import net.helenus.core.cache.EntityIdentifyingFacet;
import net.helenus.support.Either;
import java.util.Optional;
import java.util.Set;
public interface UnitOfWork<E extends Exception> extends AutoCloseable {
public interface UnitOfWork<X extends Exception> extends AutoCloseable {
/**
* Marks the beginning of a transactional section of work. Will write a record to the shared
@ -26,17 +29,17 @@ public interface UnitOfWork<E extends Exception> extends AutoCloseable {
*
* @return the handle used to commit or abort the work.
*/
UnitOfWork begin();
UnitOfWork<X> begin();
UnitOfWork addNestedUnitOfWork(UnitOfWork uow);
void addNestedUnitOfWork(UnitOfWork<X> uow);
/**
* Checks to see if the work performed between calling begin and now can be committed or not.
*
* @return a function from which to chain work that only happens when commit is successful
* @throws E when the work overlaps with other concurrent writers.
* @throws X when the work overlaps with other concurrent writers.
*/
PostCommitFunction<Void, Void> commit() throws E;
PostCommitFunction<Void, Void> commit() throws X;
/**
* Explicitly abort the work within this unit of work. Any nested aborted unit of work will
@ -48,8 +51,9 @@ public interface UnitOfWork<E extends Exception> extends AutoCloseable {
boolean hasCommitted();
//Either<Object, Set<Object>> cacheLookup(String key);
Set<Object> cacheLookup(String key);
Optional<Either<Object, Set<Object>>> cacheLookup(String key);
Optional<Either<Object, Set<Object>>> cacheLookupByFacet(Set<EntityIdentifyingFacet> facets);
Optional<Either<Object, Set<Object>>> cacheLookupByStatement(String[] statementKeys);
void cacheUpdate(Either<Object, Set<Object>> pojo, String[] statementKeys, Set<EntityIdentifyingFacet> facets);
Map<String, Set<Object>> getCache();
}

View file

@ -2,9 +2,27 @@ package net.helenus.core.cache;
import net.helenus.mapping.HelenusProperty;
import java.util.Set;
public class EntityIdentifyingFacet extends Facet {
public EntityIdentifyingFacet(HelenusProperty prop) {}
public EntityIdentifyingFacet(HelenusProperty[]... props) {}
public boolean isFullyBound() {
return false;
}
public HelenusProperty getProperty() {
return null;
}
public Set<HelenusProperty> getUnboundEntityProperties() {
return null;
}
public void setValueForProperty(HelenusProperty prop, Object value) {
}
}

View file

@ -22,9 +22,7 @@ import java.util.Map;
import net.helenus.core.*;
import net.helenus.mapping.HelenusProperty;
public abstract class AbstractFilterStreamOperation<
E, O extends AbstractFilterStreamOperation<E, O>>
extends AbstractStreamOperation<E, O> {
public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterStreamOperation<E, O>> extends AbstractStreamOperation<E, O> {
protected Map<HelenusProperty, Filter<?>> filters = null;
protected List<Filter<?>> ifFilters = null;

View file

@ -21,164 +21,109 @@ import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.HashSet;
import java.util.Map;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.Filter;
import net.helenus.core.Helenus;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.EntityIdentifyingFacet;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusProperty;
import net.helenus.mapping.value.BeanColumnValueProvider;
import net.helenus.support.Either;
import javax.swing.text.html.parser.Entity;
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
extends AbstractStatementOperation<E, O> {
extends AbstractStatementOperation<E, O> {
public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
}
public abstract Optional<E> transform(ResultSet resultSet);
public PreparedOptionalOperation<E> prepare() {
return new PreparedOptionalOperation<E>(prepareStatement(), this);
}
public ListenableFuture<PreparedOptionalOperation<E>> prepareAsync() {
final O _this = (O) this;
return Futures.transform(
prepareStatementAsync(),
new Function<PreparedStatement, PreparedOptionalOperation<E>>() {
@Override
public PreparedOptionalOperation<E> apply(PreparedStatement preparedStatement) {
return new PreparedOptionalOperation<E>(preparedStatement, _this);
}
});
}
public Optional<E> sync() throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
return transform(resultSet);
} finally {
context.stop();
public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
}
}
public Optional<E> sync(UnitOfWork uow) throws TimeoutException {
if (uow == null) return sync();
public abstract Optional<E> transform(ResultSet resultSet);
final Timer.Context context = requestLatency.time();
try {
public PreparedOptionalOperation<E> prepare() {
return new PreparedOptionalOperation<E>(prepareStatement(), this);
}
Optional<E> result = null;
String stmtKey = null;
if (enableCache) {
Set<EntityIdentifyingFacet> facets = getIdentifyingFacets();
if (!facets.isEmpty()) {
for (EntityIdentifyingFacet facet : facets) {
//TODO(gburd): what about select ResultSet, Tuple... etc.?
Optional<Either<Set<E>, E>> optionalCachedResult = uow.cacheLookup(facet.hashCode());
if (optionalCachedResult.isPresent()) {
uowCacheHits.mark();
logger.info("UnitOfWork({}) cache hit for facet: {} with key: {}", uow.hashCode(), facet.toString(), facet.hashCode());
Either<Set<E>, E> eitherCachedResult = optionalCachedResult.get();
if (eitherCachedResult.isRight()) {
E cachedResult = eitherCachedResult.getRight();
result = Optional.of(cachedResult);
}
break;
}
}
} else {
// The operation didn't provide enough information to uniquely identify the entity object
// using one of the facets, but that doesn't mean a filtering query won't return a proper
// result. Look in the cache to see if this statement has been executed before.
stmtKey = getStatementCacheKey();
Optional<Either<Set<E>, E>> optionalCachedResult = uow.cacheLookup(stmtKey.hashCode());
if (optionalCachedResult.isPresent()) {
Either<Set<E>, E> eitherCachedResult = optionalCachedResult.get();
if (eitherCachedResult.isLeft()) {
Set<E> cachedResult = eitherCachedResult.getLeft();
// Ensure that this non-indexed selection uniquely identified an Entity.
if (!(cachedResult.isEmpty() || cachedResult.size() > 1)) {
uowCacheHits.mark();
logger.info("UnitOfWork({}) cache hit for stmt {} {}", uow.hashCode(), stmtKey,
stmtKey.hashCode());
result = cachedResult.stream().findFirst();
}
}
}
}
if (result == null) {
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, true);
result = transform(resultSet);
if (enableCache && result.isPresent()) {
// If we executed a query that didn't depend on an we have a stmtKey for the filters, add that to the cache.
if (stmtKey != null) {
Set<Object> set = new HashSet<Object>(1);
set.add(result.get());
uow.getCache().put(stmtKey.hashCode(), set);
}
// Now insert this entity into the cache for each facet for this entity that we can fully bind.
E entity = result.get();
Map<String, EntityIdentifyingFacet> facetMap = Helenus.entity(result.get().getClass()).getIdentityFacets();
facetMap.forEach((facetName, facet) -> {
EntityIdentifyingFacet boundFacet = null;
if (!facet.isFullyBound()) {
boundFacet = new EntityIdentifyingFacet(facet);
for (HelenusProperty prop : facet.getUnboundEntityProperties()) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(entity, -1, prop);
if (value == null) { break; }
boundFacet.setValueForProperty(prop, value);
public ListenableFuture<PreparedOptionalOperation<E>> prepareAsync() {
final O _this = (O) this;
return Futures.transform(
prepareStatementAsync(),
new Function<PreparedStatement, PreparedOptionalOperation<E>>() {
@Override
public PreparedOptionalOperation<E> apply(PreparedStatement preparedStatement) {
return new PreparedOptionalOperation<E>(preparedStatement, _this);
}
}
if (boundFacet != null && boundFacet.isFullyBound()) {
uow.getCache().put(boundFacet.hashCode(), Either)
}
});
Set<Object> set = new HashSet<Object>(1);
set.add(result.get());
uow.getCache().put(key, set);
} else {
uow.getCache().put(key, new HashSet<Object>(0));
}
}
}
return result;
} finally {
context.stop();
});
}
}
public CompletableFuture<Optional<E>> async() {
return CompletableFuture.<Optional<E>>supplyAsync(() -> {
public Optional<E> sync() throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
return sync();
} catch (TimeoutException ex) { throw new CompletionException(ex); }
});
}
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, false);
return transform(resultSet);
} finally {
context.stop();
}
}
public CompletableFuture<Optional<E>> async(UnitOfWork uow) {
if (uow == null) return async();
return CompletableFuture.<Optional<E>>supplyAsync(() -> {
try {
public Optional<E> sync(UnitOfWork<?> uow) throws TimeoutException {
if (uow == null)
return sync();
} catch (TimeoutException ex) { throw new CompletionException(ex); }
});
}
final Timer.Context context = requestLatency.time();
try {
Optional<E> result = Optional.empty();
String[] statementKeys = null;
if (enableCache) {
Set<EntityIdentifyingFacet> facets = getFacets();
statementKeys = getQueryKeys();
result = Optional.of(checkCache(uow, facets, statementKeys));
}
if (!result.isPresent()) {
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true);
// Transform the query result set into the desired shape.
result = transform(resultSet);
}
// If we have a result and we're caching then we need to put it into the cache for future requests to find.
if (enableCache && result.isPresent()) {
updateCache(uow, result.get(), statementKeys);
}
return result;
} finally {
context.stop();
}
}
public CompletableFuture<Optional<E>> async() {
return CompletableFuture.<Optional<E>>supplyAsync(() -> {
try {
return sync();
}
catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
public CompletableFuture<Optional<E>> async(UnitOfWork<?> uow) {
if (uow == null)
return async();
return CompletableFuture.<Optional<E>>supplyAsync(() -> {
try {
return sync();
}
catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
}

View file

@ -28,283 +28,357 @@ import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.google.common.util.concurrent.ListenableFuture;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.Helenus;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.EntityIdentifyingFacet;
import net.helenus.mapping.HelenusProperty;
import net.helenus.mapping.value.BeanColumnValueProvider;
import net.helenus.support.Either;
import net.helenus.support.HelenusException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public abstract class AbstractStatementOperation<E, O extends AbstractStatementOperation<E, O>>
extends Operation<E> {
extends Operation<E> {
final Logger logger = LoggerFactory.getLogger(getClass());
public abstract Statement buildStatement(boolean cached);
protected boolean enableCache = true;
protected boolean showValues = true;
protected TraceContext traceContext;
private ConsistencyLevel consistencyLevel;
private ConsistencyLevel serialConsistencyLevel;
private RetryPolicy retryPolicy;
private boolean idempotent = false;
private boolean enableTracing = false;
private long[] defaultTimestamp = null;
private int[] fetchSize = null;
long queryExecutionTimeout = 10;
TimeUnit queryTimeoutUnits = TimeUnit.SECONDS;
public AbstractStatementOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
this.consistencyLevel = sessionOperations.getDefaultConsistencyLevel();
this.idempotent = sessionOperations.getDefaultQueryIdempotency();
}
public O ignoreCache(boolean enabled) {
enableCache = enabled;
return (O) this;
}
public O ignoreCache() {
enableCache = true;
return (O) this;
}
public O showValues(boolean enabled) {
this.showValues = enabled;
return (O) this;
}
public O defaultTimestamp(long timestamp) {
this.defaultTimestamp = new long[1];
this.defaultTimestamp[0] = timestamp;
return (O) this;
}
public O retryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return (O) this;
}
public O defaultRetryPolicy() {
this.retryPolicy = DefaultRetryPolicy.INSTANCE;
return (O) this;
}
public O idempotent() {
this.idempotent = true;
return (O) this;
}
public O isIdempotent(boolean idempotent) {
this.idempotent = idempotent;
return (O) this;
}
public O downgradingConsistencyRetryPolicy() {
this.retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
return (O) this;
}
public O fallthroughRetryPolicy() {
this.retryPolicy = FallthroughRetryPolicy.INSTANCE;
return (O) this;
}
public O consistency(ConsistencyLevel level) {
this.consistencyLevel = level;
return (O) this;
}
public O consistencyAny() {
this.consistencyLevel = ConsistencyLevel.ANY;
return (O) this;
}
public O consistencyOne() {
this.consistencyLevel = ConsistencyLevel.ONE;
return (O) this;
}
public O consistencyQuorum() {
this.consistencyLevel = ConsistencyLevel.QUORUM;
return (O) this;
}
public O consistencyAll() {
this.consistencyLevel = ConsistencyLevel.ALL;
return (O) this;
}
public O consistencyLocalOne() {
this.consistencyLevel = ConsistencyLevel.LOCAL_ONE;
return (O) this;
}
public O consistencyLocalQuorum() {
this.consistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
return (O) this;
}
public O consistencyEachQuorum() {
this.consistencyLevel = ConsistencyLevel.EACH_QUORUM;
return (O) this;
}
public O serialConsistency(ConsistencyLevel level) {
this.serialConsistencyLevel = level;
return (O) this;
}
public O serialConsistencyAny() {
this.serialConsistencyLevel = ConsistencyLevel.ANY;
return (O) this;
}
public O serialConsistencyOne() {
this.serialConsistencyLevel = ConsistencyLevel.ONE;
return (O) this;
}
public O serialConsistencyQuorum() {
this.serialConsistencyLevel = ConsistencyLevel.QUORUM;
return (O) this;
}
public O serialConsistencyAll() {
this.serialConsistencyLevel = ConsistencyLevel.ALL;
return (O) this;
}
public O serialConsistencyLocal() {
this.serialConsistencyLevel = ConsistencyLevel.LOCAL_SERIAL;
return (O) this;
}
public O serialConsistencyLocalQuorum() {
this.serialConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
return (O) this;
}
public O disableTracing() {
this.enableTracing = false;
return (O) this;
}
public O enableTracing() {
this.enableTracing = true;
return (O) this;
}
public O tracing(boolean enable) {
this.enableTracing = enable;
return (O) this;
}
public O fetchSize(int fetchSize) {
this.fetchSize = new int[1];
this.fetchSize[0] = fetchSize;
return (O) this;
}
public O queryTimeoutMs(long ms) {
this.queryExecutionTimeout = ms;
this.queryTimeoutUnits = TimeUnit.MILLISECONDS;
return (O) this;
}
public O queryTimeout(long timeout, TimeUnit units) {
this.queryExecutionTimeout = timeout;
this.queryTimeoutUnits = units;
return (O) this;
}
public Statement options(Statement statement) {
if (defaultTimestamp != null) {
statement.setDefaultTimestamp(defaultTimestamp[0]);
final Logger logger = LoggerFactory.getLogger(getClass());
protected boolean enableCache = true;
protected boolean showValues = true;
protected TraceContext traceContext;
long queryExecutionTimeout = 10;
TimeUnit queryTimeoutUnits = TimeUnit.SECONDS;
private ConsistencyLevel consistencyLevel;
private ConsistencyLevel serialConsistencyLevel;
private RetryPolicy retryPolicy;
private boolean idempotent = false;
private boolean enableTracing = false;
private long[] defaultTimestamp = null;
private int[] fetchSize = null;
public AbstractStatementOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
this.consistencyLevel = sessionOperations.getDefaultConsistencyLevel();
this.idempotent = sessionOperations.getDefaultQueryIdempotency();
}
if (consistencyLevel != null) {
statement.setConsistencyLevel(consistencyLevel);
public abstract Statement buildStatement(boolean cached);
public O ignoreCache(boolean enabled) {
enableCache = enabled;
return (O) this;
}
if (serialConsistencyLevel != null) {
statement.setSerialConsistencyLevel(serialConsistencyLevel);
public O ignoreCache() {
enableCache = true;
return (O) this;
}
if (retryPolicy != null) {
statement.setRetryPolicy(retryPolicy);
public O showValues(boolean enabled) {
this.showValues = enabled;
return (O) this;
}
if (enableTracing) {
statement.enableTracing();
} else {
statement.disableTracing();
public O defaultTimestamp(long timestamp) {
this.defaultTimestamp = new long[1];
this.defaultTimestamp[0] = timestamp;
return (O) this;
}
if (fetchSize != null) {
statement.setFetchSize(fetchSize[0]);
public O retryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return (O) this;
}
if (idempotent) {
statement.setIdempotent(true);
public O defaultRetryPolicy() {
this.retryPolicy = DefaultRetryPolicy.INSTANCE;
return (O) this;
}
return statement;
}
public O zipkinContext(TraceContext traceContext) {
if (traceContext != null) {
Tracer tracer = this.sessionOps.getZipkinTracer();
if (tracer != null) {
this.traceContext = traceContext;
}
public O idempotent() {
this.idempotent = true;
return (O) this;
}
return (O) this;
}
public Statement statement() {
return buildStatement(false);
}
public String cql() {
Statement statement = buildStatement(false);
if (statement == null) return "";
if (statement instanceof BuiltStatement) {
BuiltStatement buildStatement = (BuiltStatement) statement;
return buildStatement.setForceNoValues(true).getQueryString();
} else {
return statement.toString();
}
}
public PreparedStatement prepareStatement() {
Statement statement = buildStatement(true);
if (statement instanceof RegularStatement) {
RegularStatement regularStatement = (RegularStatement) statement;
return sessionOps.prepare(regularStatement);
public O isIdempotent(boolean idempotent) {
this.idempotent = idempotent;
return (O) this;
}
throw new HelenusException("only RegularStatements can be prepared");
}
public ListenableFuture<PreparedStatement> prepareStatementAsync() {
Statement statement = buildStatement(true);
if (statement instanceof RegularStatement) {
RegularStatement regularStatement = (RegularStatement) statement;
return sessionOps.prepareAsync(regularStatement);
public O downgradingConsistencyRetryPolicy() {
this.retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
return (O) this;
}
public O fallthroughRetryPolicy() {
this.retryPolicy = FallthroughRetryPolicy.INSTANCE;
return (O) this;
}
public O consistency(ConsistencyLevel level) {
this.consistencyLevel = level;
return (O) this;
}
public O consistencyAny() {
this.consistencyLevel = ConsistencyLevel.ANY;
return (O) this;
}
public O consistencyOne() {
this.consistencyLevel = ConsistencyLevel.ONE;
return (O) this;
}
public O consistencyQuorum() {
this.consistencyLevel = ConsistencyLevel.QUORUM;
return (O) this;
}
public O consistencyAll() {
this.consistencyLevel = ConsistencyLevel.ALL;
return (O) this;
}
public O consistencyLocalOne() {
this.consistencyLevel = ConsistencyLevel.LOCAL_ONE;
return (O) this;
}
public O consistencyLocalQuorum() {
this.consistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
return (O) this;
}
public O consistencyEachQuorum() {
this.consistencyLevel = ConsistencyLevel.EACH_QUORUM;
return (O) this;
}
public O serialConsistency(ConsistencyLevel level) {
this.serialConsistencyLevel = level;
return (O) this;
}
public O serialConsistencyAny() {
this.serialConsistencyLevel = ConsistencyLevel.ANY;
return (O) this;
}
public O serialConsistencyOne() {
this.serialConsistencyLevel = ConsistencyLevel.ONE;
return (O) this;
}
public O serialConsistencyQuorum() {
this.serialConsistencyLevel = ConsistencyLevel.QUORUM;
return (O) this;
}
public O serialConsistencyAll() {
this.serialConsistencyLevel = ConsistencyLevel.ALL;
return (O) this;
}
public O serialConsistencyLocal() {
this.serialConsistencyLevel = ConsistencyLevel.LOCAL_SERIAL;
return (O) this;
}
public O serialConsistencyLocalQuorum() {
this.serialConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
return (O) this;
}
public O disableTracing() {
this.enableTracing = false;
return (O) this;
}
public O enableTracing() {
this.enableTracing = true;
return (O) this;
}
public O tracing(boolean enable) {
this.enableTracing = enable;
return (O) this;
}
public O fetchSize(int fetchSize) {
this.fetchSize = new int[1];
this.fetchSize[0] = fetchSize;
return (O) this;
}
public O queryTimeoutMs(long ms) {
this.queryExecutionTimeout = ms;
this.queryTimeoutUnits = TimeUnit.MILLISECONDS;
return (O) this;
}
public O queryTimeout(long timeout, TimeUnit units) {
this.queryExecutionTimeout = timeout;
this.queryTimeoutUnits = units;
return (O) this;
}
public Statement options(Statement statement) {
if (defaultTimestamp != null) {
statement.setDefaultTimestamp(defaultTimestamp[0]);
}
if (consistencyLevel != null) {
statement.setConsistencyLevel(consistencyLevel);
}
if (serialConsistencyLevel != null) {
statement.setSerialConsistencyLevel(serialConsistencyLevel);
}
if (retryPolicy != null) {
statement.setRetryPolicy(retryPolicy);
}
if (enableTracing) {
statement.enableTracing();
} else {
statement.disableTracing();
}
if (fetchSize != null) {
statement.setFetchSize(fetchSize[0]);
}
if (idempotent) {
statement.setIdempotent(true);
}
return statement;
}
public O zipkinContext(TraceContext traceContext) {
if (traceContext != null) {
Tracer tracer = this.sessionOps.getZipkinTracer();
if (tracer != null) {
this.traceContext = traceContext;
}
}
return (O) this;
}
public Statement statement() {
return buildStatement(false);
}
public String cql() {
Statement statement = buildStatement(false);
if (statement == null)
return "";
if (statement instanceof BuiltStatement) {
BuiltStatement buildStatement = (BuiltStatement) statement;
return buildStatement.setForceNoValues(true).getQueryString();
} else {
return statement.toString();
}
}
public PreparedStatement prepareStatement() {
Statement statement = buildStatement(true);
if (statement instanceof RegularStatement) {
RegularStatement regularStatement = (RegularStatement) statement;
return sessionOps.prepare(regularStatement);
}
throw new HelenusException("only RegularStatements can be prepared");
}
public ListenableFuture<PreparedStatement> prepareStatementAsync() {
Statement statement = buildStatement(true);
if (statement instanceof RegularStatement) {
RegularStatement regularStatement = (RegularStatement) statement;
return sessionOps.prepareAsync(regularStatement);
}
throw new HelenusException("only RegularStatements can be prepared");
}
protected E checkCache(UnitOfWork<?> uow, Set<EntityIdentifyingFacet> facets, String[] statementKeys) {
E result = null;
if (!facets.isEmpty()) {
//TODO(gburd): what about select ResultSet, Tuple... etc.?
Optional<Either<Object, Set<Object>>> optionalCachedResult = uow.cacheLookupByFacet(facets);
if (optionalCachedResult.isPresent()) {
Either<Object, Set<Object>> eitherCachedResult = optionalCachedResult.get();
if (eitherCachedResult.isLeft()) {
uowCacheHits.mark();
logger.info("UnitOfWork({}) cache hit using facets", uow.hashCode());
result = (E) eitherCachedResult.getLeft();
}
}
} else {
// Then check to see if this query happens to uniquely identify a single object in the
// cache.
Optional<Either<Object, Set<Object>>> optionalCachedResult = uow.cacheLookupByStatement(statementKeys);
if (optionalCachedResult.isPresent()) {
Either<Object, Set<Object>> eitherCachedResult = optionalCachedResult.get();
// Statements always store Set<E> as the value in the cache.
if (eitherCachedResult.isRight()) {
Set<Object> cachedResult = eitherCachedResult.getRight();
if (cachedResult.size() == 1) {
Optional<Object> maybeResult = cachedResult.stream().findFirst();
if (maybeResult.isPresent()) {
uowCacheHits.mark();
logger.info("UnitOfWork({}) cache hit for stmt", uow.hashCode());
} else {
result = null;
}
}
}
}
}
if (result == null) {
uowCacheMiss.mark();
logger.info("UnitOfWork({}) cache miss", uow.hashCode());
}
return result;
}
protected void updateCache(UnitOfWork<?> uow, E pojo, String[] statementKeys) {
// Insert this entity into the cache for each facet for this entity that we can fully bind.
Map<String, EntityIdentifyingFacet> facetMap = Helenus.entity(pojo.getClass()).getIdentifyingFacets();
facetMap.forEach((facetName, facet) -> {
if (!facet.isFullyBound()) {
HelenusProperty prop = facet.getProperty();
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop);
facet.setValueForProperty(prop, value);
}
});
// Cache the value (pojo), the statement key, and the fully bound facets.
if (statementKeys != null) {
uow.cacheUpdate(Either.left(pojo), statementKeys,
facetMap.values()
.stream()
.filter(facet -> facet.isFullyBound())
.collect(Collectors.toSet()));
}
}
throw new HelenusException("only RegularStatements can be prepared");
}
}

View file

@ -21,13 +21,15 @@ import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
extends AbstractStatementOperation<E, O> {
@ -64,33 +66,35 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
}
}
public Stream<E> sync(UnitOfWork uow) throws TimeoutException {
if (uow == null) return sync();
public Stream<E> sync(UnitOfWork<?> uow) throws TimeoutException {
if (uow == null)
return sync();
final Timer.Context context = requestLatency.time();
try {
Stream<E> result = null;
String key = getStatementCacheKey();
if (enableCache && key != null) {
Set<E> cachedResult = (Set<E>) uow.cacheLookup(key);
if (cachedResult != null) {
//TODO(gburd): what about select ResultSet, Tuple... etc.?
uowCacheHits.mark();
logger.info("UOW({}) cache hit, {}", uow.hashCode());
result = cachedResult.stream();
} else {
uowCacheMiss.mark();
}
}
Stream<E> result = null;
E cachedResult = null;
String[] statementKeys = null;
if (result == null) {
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, true);
result = transform(resultSet);
if (key != null) {
uow.getCache().put(key, (Set<Object>) result);
if (enableCache) {
Set<EntityIdentifyingFacet> facets = getFacets();
statementKeys = getQueryKeys();
cachedResult = checkCache(uow, facets, statementKeys);
if (cachedResult != null) {
result = Stream.of(cachedResult);
}
}
if (result == null) {
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true);
result = transform(resultSet);
}
// If we have a result and we're caching then we need to put it into the cache for future requests to find.
if (enableCache && cachedResult != null) {
updateCache(uow, cachedResult, statementKeys);
}
}
return result;
} finally {
@ -106,7 +110,7 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
});
}
public CompletableFuture<Stream<E>> async(UnitOfWork uow) {
public CompletableFuture<Stream<E>> async(UnitOfWork<?> uow) {
if (uow == null) return async();
return CompletableFuture.<Stream<E>>supplyAsync(() -> {
try {

View file

@ -18,6 +18,9 @@ package net.helenus.core.operation;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.Set;
import java.util.stream.Stream;
public final class BoundStreamOperation<E>
@ -34,10 +37,13 @@ public final class BoundStreamOperation<E>
}
@Override
public String getStatementCacheKey() {
return delegate.getStatementCacheKey();
public String[] getQueryKeys() {
return delegate.getQueryKeys();
}
@Override
public Set<EntityIdentifyingFacet> getFacets() { return delegate.getFacets(); }
@Override
public Stream<E> transform(ResultSet resultSet) {
return delegate.transform(resultSet);

View file

@ -240,7 +240,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
}
@Override
public String getStatementCacheKey() {
public String[] getQueryKeys() {
List<String> keys = new ArrayList<>(values.size());
values.forEach(
t -> {
@ -248,13 +248,13 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
switch (prop.getProperty().getColumnType()) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
keys.add(prop.getColumnName() + "==" + t._2.toString());
keys.add(entity.getName().toCql() + '.' + prop.getColumnName() + "==" + t._2.toString());
break;
default:
break;
}
});
return entity.getName() + ": " + Joiner.on(",").join(keys);
return keys.toArray(new String[keys.size()]);
}
@Override
@ -265,12 +265,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
T result = super.sync(uow);
Class<?> iface = entity.getMappingInterface();
if (resultType == iface) {
String key = getStatementCacheKey();
if (key != null) {
Set<Object> set = new HashSet<Object>(1);
set.add(result);
uow.getCache().put(key, set);
}
updateCache(uow, result, getQueryKeys());
}
return result;
}

View file

@ -16,6 +16,7 @@ import java.util.concurrent.TimeoutException;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.EntityIdentifyingFacet;
import net.helenus.support.HelenusException;
public abstract class Operation<E> {
@ -75,7 +76,12 @@ public abstract class Operation<E> {
return null;
}
public Set<EntityIdentifyingFacet> getIdentifyingFacets() {
public String[] getQueryKeys() {
return null;
}
public Set<EntityIdentifyingFacet> getFacets() {
return null;
}
}

View file

@ -17,7 +17,10 @@ package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
public final class SelectFirstOperation<E>
@ -38,8 +41,8 @@ public final class SelectFirstOperation<E>
}
@Override
public String getStatementCacheKey() {
return delegate.getStatementCacheKey();
public String[] getQueryKeys() {
return delegate.getQueryKeys();
}
@Override
@ -47,6 +50,9 @@ public final class SelectFirstOperation<E>
return delegate.buildStatement(cached);
}
@Override
public Set<EntityIdentifyingFacet> getFacets() { return delegate.getFacets(); }
@Override
public Optional<E> transform(ResultSet resultSet) {
return delegate.transform(resultSet).findFirst();

View file

@ -17,7 +17,10 @@ package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
public final class SelectFirstTransformingOperation<R, E>
@ -36,10 +39,13 @@ public final class SelectFirstTransformingOperation<R, E>
}
@Override
public String getStatementCacheKey() {
return delegate.getStatementCacheKey();
public String[] getQueryKeys() {
return delegate.getQueryKeys();
}
@Override
public Set<EntityIdentifyingFacet> getFacets() { return delegate.getFacets(); }
@Override
public BuiltStatement buildStatement(boolean cached) {
return delegate.buildStatement(cached);

View file

@ -15,6 +15,7 @@
*/
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.datastax.driver.core.querybuilder.Ordering;
@ -22,11 +23,13 @@ import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.core.querybuilder.Select.Selection;
import com.datastax.driver.core.querybuilder.Select.Where;
import com.google.common.base.Joiner;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import com.google.common.collect.Iterables;
import net.helenus.core.*;
import net.helenus.core.cache.EntityIdentifyingFacet;
import net.helenus.core.reflect.HelenusPropertyNode;
@ -194,28 +197,24 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
}
@Override
public Set<EntityIdentifyingFacet> getIdentityFacets() {
public Set<EntityIdentifyingFacet> getFacets() {
HelenusEntity entity = props.get(0).getEntity();
final Set<EntityIdentifyingFacet> facets = new HashSet<>(filters.size());
// Check to see if this select statement has enough information to build one or
// more identifying facets.
entity
.getIdentityFacets()
.getIdentifyingFacets()
.forEach(
(facetName, facet) -> {
EntityIdentifyingFacet boundFacet = null;
if (!facet.isFullyBound()) {
boundFacet = new EntityIdentifyingFacet(facet);
for (HelenusProperty prop : facet.getUnboundEntityProperties()) {
Filter filter = filters.get(facet.getProperty());
if (filter == null) {
break;
if (facet.isFullyBound()) {
facets.add(facet);
} else {
HelenusProperty prop = facet.getProperty();
Filter filter = filters.get(prop);
if (filter != null) {
facet.setValueForProperty(prop, filter.toString());
facets.add(facet);
}
boundFacet.setValueForProperty(prop, filter.toString());
}
}
if (boundFacet != null && boundFacet.isFullyBound()) {
facets.add(boundFacet);
}
});
return facets;
@ -245,7 +244,6 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
+ prop.getEntity().getMappingInterface());
}
/* TODO: is this useful information to gather when caching?
if (cached) {
switch (prop.getProperty().getColumnType()) {
case PARTITION_KEY:
@ -264,7 +262,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
break;
}
}
*/
}
if (entity == null) {

View file

@ -17,6 +17,9 @@ package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
@ -36,10 +39,13 @@ public final class SelectTransformingOperation<R, E>
}
@Override
public String getStatementCacheKey() {
return delegate.getStatementCacheKey();
public String[] getQueryKeys() {
return delegate.getQueryKeys();
}
@Override
public Set<EntityIdentifyingFacet> getFacets() { return delegate.getFacets(); }
@Override
public BuiltStatement buildStatement(boolean cached) {
return delegate.buildStatement(cached);

View file

@ -584,13 +584,9 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
return sync();
}
E result = super.sync(uow);
// TODO(gburd): Only drafted entity objects are updated in the cache at this time.
if (draft != null) {
String key = getStatementCacheKey();
if (key != null) {
Set<Object> set = new HashSet<Object>(1);
set.add(result);
uow.getCache().put(key, set);
}
updateCache(uow, result, getQueryKeys());
}
return result;
}

View file

@ -33,5 +33,5 @@ public interface HelenusEntity {
HelenusProperty getProperty(String name);
Map<String, EntityIdentifyingFacet> getIdentityFacets();
Map<String, EntityIdentifyingFacet> getIdentifyingFacets();
}

View file

@ -124,9 +124,6 @@ public final class HelenusMappingEntity implements HelenusEntity {
if (primaryProperties != null) {
primaryFacet =
new EntityIdentifyingFacet(
keyspace,
table,
schemaVersion,
primaryProperties.toArray(new HelenusProperty[props.size()]));
allFacetsBuilder.put("*", primaryFacet);
primaryProperties = null;
@ -134,7 +131,7 @@ public final class HelenusMappingEntity implements HelenusEntity {
Optional<IdentityName> optionalIndexName = prop.getIndexName();
if (optionalIndexName.isPresent()) {
EntityIdentifyingFacet facet =
new EntityIdentifyingFacet(keyspace, table, schemaVersion, prop);
new EntityIdentifyingFacet(prop);
ancillaryFacetsBuilder.put(prop.getPropertyName(), facet);
}
}
@ -174,6 +171,11 @@ public final class HelenusMappingEntity implements HelenusEntity {
return props.get(name);
}
@Override
public Map<String, EntityIdentifyingFacet> getIdentifyingFacets() {
return allIdentityFacets;
}
@Override
public IdentityName getName() {
return name;