Merge branch 'gburd/wip-facet-cache' into develop

This commit is contained in:
Greg Burd 2017-10-20 12:43:20 -04:00
commit e3ec76e6da
58 changed files with 803 additions and 225 deletions

36
NOTES
View file

@ -1,3 +1,8 @@
--- Cache
// `E` is the type of the Entity class or one of:
// - ResultSet
@ -315,3 +320,34 @@ begin:
}
};
}
----------------------------------
if ("ttl".equals(methodName) && method.getParameterCount() == 1 && method.getReturnType() == int.class) {
Getter getter = (Getter) args[0];
if (getter == null) {
return false;
}
HelenusProperty prop = MappingUtil.resolveMappingProperty(getter).getProperty();
String getterName = prop.getPropertyName();
String ttlKeyForProperty = prop.getColumnName().toCql() + "_ttl";
if (src.containsKey(ttlKeyForProperty)) {
return src.get(ttlKeyForProperty);
} else {
return 0;
}
}
if ("written".equals(methodName) && method.getParameterCount() == 1 && method.getReturnType() == int.class) {
Getter getter = (Getter) args[0];
if (getter == null) {
return false;
}
HelenusProperty prop = MappingUtil.resolveMappingProperty(getter).getProperty();
String getterName = prop.getPropertyName();
String ttlKeyForProperty = prop.getColumnName().toCql() + "_ttl";
if (src.containsKey(ttlKeyForProperty)) {
return src.get(ttlKeyForProperty);
} else {
return 0;
}
}

View file

@ -64,7 +64,7 @@ dependencies {
compile group: 'org.aspectj', name: 'aspectjweaver', version: '1.8.10'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.6'
compile group: 'org.springframework', name: 'spring-core', version: '4.3.10.RELEASE'
compile group: 'org.ahocorasick', name: 'ahocorasick', version: '0.4.0'
compile group: 'com.google.guava', name: 'guava', version: '20.0'
compile group: 'com.diffplug.durian', name: 'durian', version: '3.+'
compile group: 'io.zipkin.java', name: 'zipkin', version: '1.29.2'

View file

@ -18,18 +18,24 @@ package net.helenus.core;
import java.util.*;
import com.diffplug.common.base.Errors;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.google.common.collect.TreeTraverser;
import net.helenus.core.cache.Facet;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfWork, AutoCloseable {
public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfWork<E>, AutoCloseable {
private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>();
private final HelenusSession session;
private final AbstractUnitOfWork<E> parent;
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
private final Map<String, Set<Object>> cache = new HashMap<String, Set<Object>>();
private boolean aborted = false;
private boolean committed = false;
// Cache:
private final Table<String, String, Object> cache = HashBasedTable.create();
protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork<E> parent) {
Objects.requireNonNull(session, "containing session cannot be null");
@ -37,14 +43,15 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
this.parent = parent;
}
public UnitOfWork addNestedUnitOfWork(UnitOfWork uow) {
@Override
public void addNestedUnitOfWork(UnitOfWork<E> uow) {
synchronized (nested) {
nested.add((AbstractUnitOfWork<E>) uow);
}
return this;
}
public UnitOfWork begin() {
@Override
public UnitOfWork<E> begin() {
// log.record(txn::start)
return this;
}
@ -57,20 +64,41 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
}
}
public Set<Object> cacheLookup(String key) {
Set<Object> r = getCache().get(key);
if (r != null) {
return r;
} else {
if (parent != null) {
return parent.cacheLookup(key);
@Override
public Optional<Object> cacheLookup(List<Facet> facets) {
Facet table = facets.remove(0);
String tableName = table.value().toString();
Optional<Object> result = Optional.empty();
for (Facet facet : facets) {
String columnName = facet.name() + "==" + facet.value();
Object value = cache.get(tableName, columnName);
if (value != null) {
if (result.isPresent() && result.get() != value) {
// One facet matched, but another did not.
result = Optional.empty();
break;
} else {
result = Optional.of(value);
}
}
}
return null;
if (!result.isPresent()) {
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
if (parent != null) {
return parent.cacheLookup(facets);
}
}
return result;
}
public Map<String, Set<Object>> getCache() {
return cache;
@Override
public void cacheUpdate(Object value, List<Facet> facets) {
Facet table = facets.remove(0);
String tableName = table.value().toString();
for (Facet facet : facets) {
String columnName = facet.name() + "==" + facet.value();
cache.put(tableName, columnName, value);
}
}
private Iterator<AbstractUnitOfWork<E>> getChildNodes() {
@ -108,25 +136,14 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
committed = true;
aborted = false;
// TODO(gburd): union this cache with parent's (if there is a parent) or with
// the session cache for all cacheable entities we currently hold
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
// Merge UOW cache into parent's cache.
if (parent != null) {
Map<String, Set<Object>> parentCache = parent.getCache();
for (String key : cache.keySet()) {
if (parentCache.containsKey(key)) {
// merge the sets
Set<Object> ps = parentCache.get(key);
ps.addAll(cache.get(key)); // TODO(gburd): review this, likely not correct in all cases as-is.
} else {
// add the missing set
parentCache.put(key, cache.get(key));
}
}
}
parent.mergeCache(cache);
} // else {
// TODO... merge into session cache objects marked cacheable
// }
// Apply all post-commit functions for
if (parent == null) {
@ -154,6 +171,23 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
// cache.invalidateSince(txn::start time)
}
private void mergeCache(Table<String, String, Object> from) {
Table<String, String, Object> to = this.cache;
from.rowMap().forEach((rowKey, columnMap) -> {
columnMap.forEach((columnKey, value) -> {
if (to.contains(rowKey, columnKey)) {
to.put(rowKey, columnKey, merge(to.get(rowKey, columnKey), from.get(rowKey, columnKey)));
} else {
to.put(rowKey, columnKey, from.get(rowKey, columnKey));
}
});
});
}
private Object merge(Object to, Object from) {
return to; // TODO(gburd): yeah...
}
public String describeConflicts() {
return "it's complex...";
}

View file

@ -105,6 +105,10 @@ public final class Filter<V> {
return new Filter<V>(node, postulate);
}
public V[] postulateValues() {
return postulate.values();
}
@Override
public String toString() {
return node.getColumnName() + postulate.toString();

View file

@ -71,6 +71,10 @@ public final class Postulate<V> {
}
}
public V[] values() {
return values;
}
@Override
public String toString() {

View file

@ -140,6 +140,28 @@ public final class SchemaUtil {
return SchemaBuilder.dropType(type.getTypeName()).ifExists();
}
public static String createPrimaryKeyPhrase(Collection<HelenusProperty> properties) {
List<String> p = new ArrayList<String>(properties.size());
List<String> c = new ArrayList<String>(properties.size());
for (HelenusProperty prop : properties) {
String columnName = prop.getColumnName().toCql();
switch (prop.getColumnType()) {
case PARTITION_KEY :
p.add(columnName);
break;
case CLUSTERING_COLUMN :
c.add(columnName);
break;
default :
break;
}
}
return "(" + ((p.size() > 1) ? "(" + String.join(", ", p) + ")" : p.get(0))
+ ((c.size() > 0) ? ", " + ((c.size() > 1) ? "(" + String.join(", ", c) + ")" : c.get(0)) : "") + ")";
}
public static SchemaStatement createMaterializedView(String keyspace, String viewName, HelenusEntity entity) {
if (entity.getType() != HelenusEntityType.VIEW) {
throw new HelenusMappingException("expected view entity " + entity);
@ -162,20 +184,16 @@ public final class SchemaUtil {
Class<?> iface = entity.getMappingInterface();
String tableName = Helenus.entity(iface.getInterfaces()[0]).getName().toCql();
Select.Where where = selection.from(tableName).where();
List<String> p = new ArrayList<String>(props.size());
List<String> c = new ArrayList<String>(props.size());
List<String> o = new ArrayList<String>(props.size());
for (HelenusPropertyNode prop : props) {
String columnName = prop.getColumnName();
switch (prop.getProperty().getColumnType()) {
case PARTITION_KEY :
p.add(columnName);
where = where.and(new IsNotNullClause(columnName));
break;
case CLUSTERING_COLUMN :
c.add(columnName);
where = where.and(new IsNotNullClause(columnName));
ClusteringColumn clusteringColumn = prop.getProperty().getGetterMethod()
@ -189,8 +207,7 @@ public final class SchemaUtil {
}
}
String primaryKey = "PRIMARY KEY (" + ((p.size() > 1) ? "(" + String.join(", ", p) + ")" : p.get(0))
+ ((c.size() > 0) ? ", " + ((c.size() > 1) ? "(" + String.join(", ", c) + ")" : c.get(0)) : "") + ")";
String primaryKey = "PRIMARY KEY " + createPrimaryKeyPhrase(entity.getOrderedProperties());
String clustering = "";
if (o.size() > 0) {

View file

@ -264,7 +264,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
}
DslExportable dsl = (DslExportable) Helenus.dsl(iface);
dsl.setCassandraMetadataForHelenusSesion(session.getCluster().getMetadata());
dsl.setCassandraMetadataForHelenusSession(session.getCluster().getMetadata());
sessionRepository.add(dsl);
});

View file

@ -15,10 +15,12 @@
*/
package net.helenus.core;
import java.util.Map;
import java.util.Set;
import java.util.List;
import java.util.Optional;
public interface UnitOfWork<E extends Exception> extends AutoCloseable {
import net.helenus.core.cache.Facet;
public interface UnitOfWork<X extends Exception> extends AutoCloseable {
/**
* Marks the beginning of a transactional section of work. Will write a record
@ -26,9 +28,9 @@ public interface UnitOfWork<E extends Exception> extends AutoCloseable {
*
* @return the handle used to commit or abort the work.
*/
UnitOfWork begin();
UnitOfWork<X> begin();
UnitOfWork addNestedUnitOfWork(UnitOfWork uow);
void addNestedUnitOfWork(UnitOfWork<X> uow);
/**
* Checks to see if the work performed between calling begin and now can be
@ -36,10 +38,10 @@ public interface UnitOfWork<E extends Exception> extends AutoCloseable {
*
* @return a function from which to chain work that only happens when commit is
* successful
* @throws E
* @throws X
* when the work overlaps with other concurrent writers.
*/
PostCommitFunction<Void, Void> commit() throws E;
PostCommitFunction<Void, Void> commit() throws X;
/**
* Explicitly abort the work within this unit of work. Any nested aborted unit
@ -51,8 +53,7 @@ public interface UnitOfWork<E extends Exception> extends AutoCloseable {
boolean hasCommitted();
// Either<Object, Set<Object>> cacheLookup(String key);
Set<Object> cacheLookup(String key);
Optional<Object> cacheLookup(List<Facet> facets);
Map<String, Set<Object>> getCache();
void cacheUpdate(Object pojo, List<Facet> facets);
}

View file

@ -1,3 +1,18 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.annotation;
import java.lang.annotation.ElementType;

View file

@ -0,0 +1,83 @@
package net.helenus.core.aspect;
import java.lang.reflect.Method;
import java.util.Arrays;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.Assert;
import net.helenus.core.annotation.Retry;
@Aspect
public class RetryAspect {
private static final Logger log = LoggerFactory.getLogger(RetryAspect.class);
@Around("@annotation(net.helenus.core.annotations.Retry)")
public Object retry(ProceedingJoinPoint pjp) throws Throwable {
Retry retryAnnotation = getRetryAnnotation(pjp);
return (retryAnnotation != null) ? proceed(pjp, retryAnnotation) : proceed(pjp);
}
private Object proceed(ProceedingJoinPoint pjp) throws Throwable {
return pjp.proceed();
}
private Object proceed(ProceedingJoinPoint pjp, Retry retryAnnotation) throws Throwable {
int times = retryAnnotation.times();
Class<? extends Throwable>[] retryOn = retryAnnotation.on();
Assert.isTrue(times > 0, "@Retry{times} should be greater than 0!");
Assert.isTrue(retryOn.length > 0, "@Retry{on} should have at least one Throwable!");
log.info("Proceed with {} retries on {}", times, Arrays.toString(retryOn));
return tryProceeding(pjp, times, retryOn);
}
private Object tryProceeding(ProceedingJoinPoint pjp, int times, Class<? extends Throwable>[] retryOn)
throws Throwable {
try {
return proceed(pjp);
} catch (Throwable throwable) {
if (isRetryThrowable(throwable, retryOn) && times-- > 0) {
log.info("Conflict detected, {} remaining retries on {}", times, Arrays.toString(retryOn));
return tryProceeding(pjp, times, retryOn);
}
throw throwable;
}
}
private boolean isRetryThrowable(Throwable throwable, Class<? extends Throwable>[] retryOn) {
Throwable[] causes = ExceptionUtils.getThrowables(throwable);
for (Throwable cause : causes) {
for (Class<? extends Throwable> retryThrowable : retryOn) {
if (retryThrowable.isAssignableFrom(cause.getClass())) {
return true;
}
}
}
return false;
}
private Retry getRetryAnnotation(ProceedingJoinPoint pjp) throws NoSuchMethodException {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
Retry retryAnnotation = AnnotationUtils.findAnnotation(method, Retry.class);
if (retryAnnotation != null) {
return retryAnnotation;
}
Class[] argClasses = new Class[pjp.getArgs().length];
for (int i = 0; i < pjp.getArgs().length; i++) {
argClasses[i] = pjp.getArgs()[i].getClass();
}
method = pjp.getTarget().getClass().getMethod(pjp.getSignature().getName(), argClasses);
return AnnotationUtils.findAnnotation(method, Retry.class);
}
}

View file

@ -0,0 +1,38 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.cache;
import java.util.Map;
import java.util.stream.Collectors;
import net.helenus.mapping.HelenusProperty;
public class BoundFacet extends Facet<String> {
private final Map<HelenusProperty, Object> properties;
BoundFacet(String name, Map<HelenusProperty, Object> properties) {
super(name,
(properties.keySet().size() > 1)
? "[" + String.join(", ",
properties.keySet().stream().map(key -> properties.get(key).toString())
.collect(Collectors.toSet()))
+ "]"
: String.join("", properties.keySet().stream().map(key -> properties.get(key).toString())
.collect(Collectors.toSet())));
this.properties = properties;
}
}

View file

@ -0,0 +1,56 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.cache;
/**
* An Entity is identifiable via one or more Facets
*
* A Facet is is a set of Properties and bound Facets
*
* An Entity will have it's Keyspace, Table and Schema Version Facets bound.
*
* A property may also have a TTL or write time bound.
*
* The cache contains key->value mappings of merkel-hash -> Entity or
* Set<Entity> The only way a Set<Entity> is put into the cache is with a key =
* hash([Entity's bound Facets, hash(filter clause from SELECT)])
*
* REMEMBER to update the cache on build() for all impacted facets, delete
* existing keys and add new keys
*/
public class Facet<T> {
private final String name;
private T value;
public Facet(String name) {
this.name = name;
}
public Facet(String name, T value) {
this.name = name;
this.value = value;
}
public String name() {
return name;
}
public T value() {
return value;
}
}

View file

@ -0,0 +1,74 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.cache;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.helenus.core.SchemaUtil;
import net.helenus.mapping.HelenusProperty;
public class UnboundFacet extends Facet<String> {
private final List<HelenusProperty> properties;
public UnboundFacet(List<HelenusProperty> properties) {
super(SchemaUtil.createPrimaryKeyPhrase(properties));
this.properties = properties;
}
public UnboundFacet(HelenusProperty property) {
super(property.getPropertyName());
properties = new ArrayList<HelenusProperty>();
properties.add(property);
}
public List<HelenusProperty> getProperties() {
return properties;
}
public Binder binder() {
return new Binder(name(), properties);
}
public static class Binder {
private final String name;
private final List<HelenusProperty> properties = new ArrayList<HelenusProperty>();
private Map<HelenusProperty, Object> boundProperties = new HashMap<HelenusProperty, Object>();
Binder(String name, List<HelenusProperty> properties) {
this.name = name;
this.properties.addAll(properties);
}
public Binder setValueForProperty(HelenusProperty prop, Object value) {
properties.remove(prop);
boundProperties.put(prop, value);
return this;
}
public boolean isBound() {
return properties.isEmpty();
}
public BoundFacet bind() {
return new BoundFacet(name, boundProperties);
}
}
}

View file

@ -27,16 +27,16 @@ import net.helenus.core.UnitOfWork;
public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> extends AbstractStatementOperation<E, O> {
public AbstractOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
}
public abstract E transform(ResultSet resultSet);
public boolean cacheable() {
return false;
}
public AbstractOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
}
public PreparedOperation<E> prepare() {
return new PreparedOperation<E>(prepareStatement(), this);
}

View file

@ -15,9 +15,8 @@
*/
package net.helenus.core.operation;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
@ -29,8 +28,16 @@ import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.Facet;
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
extends
@ -68,41 +75,39 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
}
}
public Optional<E> sync(UnitOfWork uow) throws TimeoutException {
public Optional<E> sync(UnitOfWork<?> uow) throws TimeoutException {
if (uow == null)
return sync();
final Timer.Context context = requestLatency.time();
try {
Optional<E> result = null;
String key = getStatementCacheKey();
if (enableCache && key != null) {
Set<E> cachedResult = (Set<E>) uow.cacheLookup(key);
if (cachedResult != null) {
// TODO(gburd): what about select ResultSet, Tuple... etc.?
uowCacheHits.mark();
logger.info("UOW({}) cache hit, {}", uow.hashCode(), key);
result = cachedResult.stream().findFirst();
} else {
uowCacheMiss.mark();
Optional<E> result = Optional.empty();
E cacheResult = null;
String[] statementKeys = null;
if (enableCache) {
List<Facet> facets = bindFacetValues();
cacheResult = checkCache(uow, facets);
if (cacheResult != null) {
result = Optional.of(cacheResult);
}
}
if (result == null) {
if (!result.isPresent()) {
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true);
result = transform(resultSet);
if (key != null) {
if (result.isPresent()) {
Set<Object> set = new HashSet<Object>(1);
set.add(result.get());
uow.getCache().put(key, set);
} else {
uow.getCache().put(key, new HashSet<Object>(0));
}
}
// Transform the query result set into the desired shape.
result = transform(resultSet);
}
// If we have a result, it wasn't from cache, and we're caching things then we
// need to put this result
// into the cache for future requests to find.
if (enableCache && cacheResult == null && result.isPresent()) {
updateCache(uow, result.get(), getFacets());
}
return result;
@ -121,7 +126,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
});
}
public CompletableFuture<Optional<E>> async(UnitOfWork uow) {
public CompletableFuture<Optional<E>> async(UnitOfWork<?> uow) {
if (uow == null)
return async();
return CompletableFuture.<Optional<E>>supplyAsync(() -> {

View file

@ -15,6 +15,10 @@
*/
package net.helenus.core.operation;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@ -34,17 +38,21 @@ import com.google.common.util.concurrent.ListenableFuture;
import brave.Tracer;
import brave.propagation.TraceContext;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.Facet;
import net.helenus.core.cache.UnboundFacet;
import net.helenus.core.reflect.MapExportable;
import net.helenus.mapping.value.BeanColumnValueProvider;
import net.helenus.support.HelenusException;
public abstract class AbstractStatementOperation<E, O extends AbstractStatementOperation<E, O>> extends Operation<E> {
final Logger logger = LoggerFactory.getLogger(getClass());
public abstract Statement buildStatement(boolean cached);
protected boolean enableCache = true;
protected boolean showValues = true;
protected TraceContext traceContext;
long queryExecutionTimeout = 10;
TimeUnit queryTimeoutUnits = TimeUnit.SECONDS;
private ConsistencyLevel consistencyLevel;
private ConsistencyLevel serialConsistencyLevel;
private RetryPolicy retryPolicy;
@ -52,8 +60,6 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
private boolean enableTracing = false;
private long[] defaultTimestamp = null;
private int[] fetchSize = null;
long queryExecutionTimeout = 10;
TimeUnit queryTimeoutUnits = TimeUnit.SECONDS;
public AbstractStatementOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
@ -61,6 +67,8 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
this.idempotent = sessionOperations.getDefaultQueryIdempotency();
}
public abstract Statement buildStatement(boolean cached);
public O ignoreCache(boolean enabled) {
enableCache = enabled;
return (O) this;
@ -309,4 +317,51 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
throw new HelenusException("only RegularStatements can be prepared");
}
protected E checkCache(UnitOfWork<?> uow, List<Facet> facets) {
E result = null;
Optional<Object> optionalCachedResult = Optional.empty();
if (!facets.isEmpty()) {
optionalCachedResult = uow.cacheLookup(facets);
if (optionalCachedResult.isPresent()) {
uowCacheHits.mark();
logger.info("UnitOfWork({}) cache hit using facets", uow.hashCode());
result = (E) optionalCachedResult.get();
}
}
if (result == null) {
uowCacheMiss.mark();
logger.info("UnitOfWork({}) cache miss", uow.hashCode());
}
return result;
}
protected void updateCache(UnitOfWork<?> uow, E pojo, List<Facet> identifyingFacets) {
List<Facet> facets = new ArrayList<>();
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
for (Facet facet : identifyingFacets) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
binder.setValueForProperty(prop, value.toString());
} else {
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
}
facets.add(binder.bind());
});
} else {
facets.add(facet);
}
}
// Cache the value (pojo), the statement key, and the fully bound facets.
uow.cacheUpdate(pojo, facets);
}
}

View file

@ -15,7 +15,7 @@
*/
package net.helenus.core.operation;
import java.util.Set;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.Facet;
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
extends
@ -67,23 +68,20 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
}
}
public Stream<E> sync(UnitOfWork uow) throws TimeoutException {
public Stream<E> sync(UnitOfWork<?> uow) throws TimeoutException {
if (uow == null)
return sync();
final Timer.Context context = requestLatency.time();
try {
Stream<E> result = null;
String key = getStatementCacheKey();
if (enableCache && key != null) {
Set<E> cachedResult = (Set<E>) uow.cacheLookup(key);
E cachedResult = null;
if (enableCache) {
List<Facet> facets = bindFacetValues();
cachedResult = checkCache(uow, facets);
if (cachedResult != null) {
// TODO(gburd): what about select ResultSet, Tuple... etc.?
uowCacheHits.mark();
logger.info("UOW({}) cache hit, {}", uow.hashCode());
result = cachedResult.stream();
} else {
uowCacheMiss.mark();
result = Stream.of(cachedResult);
}
}
@ -91,10 +89,12 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true);
result = transform(resultSet);
}
if (key != null) {
uow.getCache().put(key, (Set<Object>) result);
}
// If we have a result and we're caching then we need to put it into the cache
// for future requests to find.
if (enableCache && cachedResult != null) {
updateCache(uow, cachedResult, getFacets());
}
return result;
@ -113,7 +113,7 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
});
}
public CompletableFuture<Stream<E>> async(UnitOfWork uow) {
public CompletableFuture<Stream<E>> async(UnitOfWork<?> uow) {
if (uow == null)
return async();
return CompletableFuture.<Stream<E>>supplyAsync(() -> {

View file

@ -15,12 +15,15 @@
*/
package net.helenus.core.operation;
import java.util.List;
import java.util.stream.Stream;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
import net.helenus.core.cache.Facet;
public final class BoundStreamOperation<E> extends AbstractStreamOperation<E, BoundStreamOperation<E>> {
private final BoundStatement boundStatement;
@ -33,8 +36,8 @@ public final class BoundStreamOperation<E> extends AbstractStreamOperation<E, Bo
}
@Override
public String getStatementCacheKey() {
return delegate.getStatementCacheKey();
public List<Facet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override

View file

@ -23,13 +23,13 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.base.Joiner;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.Getter;
import net.helenus.core.Helenus;
import net.helenus.core.UnitOfWork;
import net.helenus.core.reflect.DefaultPrimitiveTypes;
import net.helenus.core.reflect.Drafted;
import net.helenus.core.reflect.HelenusPropertyNode;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusProperty;
@ -166,6 +166,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
Class<?> iface = entity.getMappingInterface();
if (resultType == iface) {
if (values.size() > 0) {
boolean immutable = iface.isAssignableFrom(Drafted.class);
Collection<HelenusProperty> properties = entity.getOrderedProperties();
Map<String, Object> backingMap = new HashMap<String, Object>(properties.size());
@ -187,7 +188,8 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
// If we started this operation with an instance of this type, use values from
// that.
if (pojo != null) {
backingMap.put(key, BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop));
backingMap.put(key,
BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, immutable));
} else {
// Otherwise we'll use default values for the property type if available.
Class<?> propType = prop.getJavaType();
@ -234,23 +236,6 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
}
}
@Override
public String getStatementCacheKey() {
List<String> keys = new ArrayList<>(values.size());
values.forEach(t -> {
HelenusPropertyNode prop = t._1;
switch (prop.getProperty().getColumnType()) {
case PARTITION_KEY :
case CLUSTERING_COLUMN :
keys.add(prop.getColumnName() + "==" + t._2.toString());
break;
default :
break;
}
});
return entity.getName() + ": " + Joiner.on(",").join(keys);
}
@Override
public T sync(UnitOfWork uow) throws TimeoutException {
if (uow == null) {
@ -259,12 +244,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
T result = super.sync(uow);
Class<?> iface = entity.getMappingInterface();
if (resultType == iface) {
String key = getStatementCacheKey();
if (key != null) {
Set<Object> set = new HashSet<Object>(1);
set.add(result);
uow.getCache().put(key, set);
}
updateCache(uow, result, entity.getFacets());
}
return result;
}

View file

@ -1,5 +1,21 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.operation;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -15,6 +31,7 @@ import brave.Tracer;
import brave.propagation.TraceContext;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.Facet;
public abstract class Operation<E> {
@ -68,7 +85,12 @@ public abstract class Operation<E> {
return null;
}
public String getStatementCacheKey() {
public List<Facet> getFacets() {
return null;
}
public List<Facet> bindFacetValues() {
return null;
}
}

View file

@ -15,12 +15,15 @@
*/
package net.helenus.core.operation;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import net.helenus.core.cache.Facet;
public final class SelectFirstOperation<E> extends AbstractFilterOptionalOperation<E, SelectFirstOperation<E>> {
private final SelectOperation<E> delegate;
@ -38,13 +41,18 @@ public final class SelectFirstOperation<E> extends AbstractFilterOptionalOperati
}
@Override
public String getStatementCacheKey() {
return delegate.getStatementCacheKey();
public BuiltStatement buildStatement(boolean cached) {
return delegate.buildStatement(cached);
}
@Override
public BuiltStatement buildStatement(boolean cached) {
return delegate.buildStatement(cached);
public List<Facet> getFacets() {
return delegate.getFacets();
}
@Override
public List<Facet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override

View file

@ -15,12 +15,15 @@
*/
package net.helenus.core.operation;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import net.helenus.core.cache.Facet;
public final class SelectFirstTransformingOperation<R, E>
extends
AbstractFilterOptionalOperation<R, SelectFirstTransformingOperation<R, E>> {
@ -38,8 +41,8 @@ public final class SelectFirstTransformingOperation<R, E>
}
@Override
public String getStatementCacheKey() {
return delegate.getStatementCacheKey();
public List<Facet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override

View file

@ -28,9 +28,11 @@ import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.core.querybuilder.Select.Selection;
import com.datastax.driver.core.querybuilder.Select.Where;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import net.helenus.core.*;
import net.helenus.core.cache.Facet;
import net.helenus.core.cache.UnboundFacet;
import net.helenus.core.reflect.HelenusPropertyNode;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.MappingUtil;
@ -42,9 +44,8 @@ import net.helenus.support.HelenusMappingException;
public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, SelectOperation<E>> {
protected Function<Row, E> rowMapper = null;
protected final List<HelenusPropertyNode> props = new ArrayList<HelenusPropertyNode>();
protected Function<Row, E> rowMapper = null;
protected List<Ordering> ordering = null;
protected Integer limit = null;
protected boolean allowFiltering = false;
@ -176,29 +177,38 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
}
@Override
public String getStatementCacheKey() {
List<String> keys = new ArrayList<>(filters.size());
public List<Facet> getFacets() {
HelenusEntity entity = props.get(0).getEntity();
return entity.getFacets();
}
for (HelenusPropertyNode prop : props) {
switch (prop.getProperty().getColumnType()) {
case PARTITION_KEY :
case CLUSTERING_COLUMN :
Filter filter = filters.get(prop.getProperty());
@Override
public List<Facet> bindFacetValues() {
HelenusEntity entity = props.get(0).getEntity();
List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
Filter filter = filters.get(prop);
if (filter != null) {
keys.add(filter.toString());
} else {
return null;
Object[] postulates = filter.postulateValues();
for (Object p : postulates) {
binder.setValueForProperty(prop, p.toString());
}
}
break;
default :
if (keys.size() > 0) {
return entity.getName() + ": " + Joiner.on(",").join(keys);
}
return null;
});
if (binder.isBound()) {
boundFacets.add(binder.bind());
}
} else {
boundFacets.add(facet);
}
}
return null;
return boundFacets;
}
@Override
@ -222,16 +232,24 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
+ entity.getMappingInterface() + " or " + prop.getEntity().getMappingInterface());
}
/*
* TODO: is this useful information to gather when caching? if (cached) { switch
* (prop.getProperty().getColumnType()) { case PARTITION_KEY: case
* CLUSTERING_COLUMN: break; default: if (entity.equals(prop.getEntity())) { if
* (prop.getNext().isPresent()) { columnName =
* Iterables.getLast(prop).getColumnName().toCql(true); } if
* (!prop.getProperty().getDataType().isCollectionType()) {
* selection.writeTime(columnName).as(columnName + "_writeTime");
* selection.ttl(columnName).as(columnName + "_ttl"); } } break; } }
*/
if (cached) {
switch (prop.getProperty().getColumnType()) {
case PARTITION_KEY :
case CLUSTERING_COLUMN :
break;
default :
if (entity.equals(prop.getEntity())) {
if (prop.getNext().isPresent()) {
columnName = Iterables.getLast(prop).getColumnName().toCql(true);
}
if (!prop.getProperty().getDataType().isCollectionType()) {
selection.writeTime(columnName).as(columnName + "_writeTime");
selection.ttl(columnName).as(columnName + "_ttl");
}
}
break;
}
}
}
if (entity == null) {

View file

@ -15,12 +15,15 @@
*/
package net.helenus.core.operation;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import net.helenus.core.cache.Facet;
public final class SelectTransformingOperation<R, E>
extends
AbstractFilterStreamOperation<R, SelectTransformingOperation<R, E>> {
@ -38,8 +41,13 @@ public final class SelectTransformingOperation<R, E>
}
@Override
public String getStatementCacheKey() {
return delegate.getStatementCacheKey();
public List<Facet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override
public List<Facet> getFacets() {
return delegate.getFacets();
}
@Override

View file

@ -575,13 +575,10 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
return sync();
}
E result = super.sync(uow);
// TODO(gburd): Only drafted entity objects are updated in the cache at this
// time.
if (draft != null) {
String key = getStatementCacheKey();
if (key != null) {
Set<Object> set = new HashSet<Object>(1);
set.add(result);
uow.getCache().put(key, set);
}
updateCache(uow, result, getFacets());
}
return result;
}

View file

@ -1,3 +1,18 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.reflect;
import java.util.Set;

View file

@ -21,13 +21,13 @@ import net.helenus.mapping.HelenusEntity;
public interface DslExportable {
public static final String GET_ENTITY_METHOD = "getHelenusMappingEntity";
public static final String GET_PARENT_METHOD = "getParentDslHelenusPropertyNode";
public static final String SET_METADATA_METHOD = "setCassandraMetadataForHelenusSesion";
String GET_ENTITY_METHOD = "getHelenusMappingEntity";
String GET_PARENT_METHOD = "getParentDslHelenusPropertyNode";
String SET_METADATA_METHOD = "setCassandraMetadataForHelenusSession";
HelenusEntity getHelenusMappingEntity();
HelenusPropertyNode getParentDslHelenusPropertyNode();
void setCassandraMetadataForHelenusSesion(Metadata metadata);
void setCassandraMetadataForHelenusSession(Metadata metadata);
}

View file

@ -58,7 +58,7 @@ public class DslInvocationHandler<E> implements InvocationHandler {
this.classLoader = classLoader;
}
public void setCassandraMetadataForHelenusSesion(Metadata metadata) {
public void setCassandraMetadataForHelenusSession(Metadata metadata) {
if (metadata != null) {
this.metadata = metadata;
entity = init(metadata);
@ -118,7 +118,7 @@ public class DslInvocationHandler<E> implements InvocationHandler {
if (DslExportable.SET_METADATA_METHOD.equals(methodName) && args.length == 1 && args[0] instanceof Metadata) {
if (metadata == null) {
this.setCassandraMetadataForHelenusSesion((Metadata) args[0]);
this.setCassandraMetadataForHelenusSession((Metadata) args[0]);
}
return null;
}

View file

@ -75,7 +75,12 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
return false;
}
if (Proxy.isProxyClass(otherObj.getClass())) {
return this == Proxy.getInvocationHandler(otherObj);
if (this == Proxy.getInvocationHandler(otherObj)) {
return true;
}
}
if (otherObj instanceof MapExportable && src.equals(((MapExportable) otherObj).toMap())) {
return true;
}
return false;
}

View file

@ -16,6 +16,9 @@
package net.helenus.mapping;
import java.util.Collection;
import java.util.List;
import net.helenus.core.cache.Facet;
public interface HelenusEntity {
@ -30,4 +33,6 @@ public interface HelenusEntity {
Collection<HelenusProperty> getOrderedProperties();
HelenusProperty getProperty(String name);
List<Facet> getFacets();
}

View file

@ -28,6 +28,8 @@ import com.google.common.collect.ImmutableMap;
import net.helenus.config.HelenusSettings;
import net.helenus.core.Helenus;
import net.helenus.core.annotation.Cacheable;
import net.helenus.core.cache.Facet;
import net.helenus.core.cache.UnboundFacet;
import net.helenus.mapping.annotation.*;
import net.helenus.support.HelenusMappingException;
@ -40,6 +42,7 @@ public final class HelenusMappingEntity implements HelenusEntity {
private final ImmutableMap<String, Method> methods;
private final ImmutableMap<String, HelenusProperty> props;
private final ImmutableList<HelenusProperty> orderedProps;
private final List<Facet> facets;
public HelenusMappingEntity(Class<?> iface, Metadata metadata) {
this(iface, autoDetectType(iface), metadata);
@ -105,7 +108,34 @@ public final class HelenusMappingEntity implements HelenusEntity {
validateOrdinals();
// Caching
cacheable = (null != iface.getDeclaredAnnotation(Cacheable.class));
List<HelenusProperty> primaryKeyProperties = new ArrayList<>();
ImmutableList.Builder<Facet> facetsBuilder = ImmutableList.builder();
facetsBuilder.add(new Facet("table", name.toCql()));
for (HelenusProperty prop : orderedProps) {
switch (prop.getColumnType()) {
case PARTITION_KEY :
case CLUSTERING_COLUMN :
primaryKeyProperties.add(prop);
break;
default :
if (primaryKeyProperties != null && primaryKeyProperties.size() > 0) {
facetsBuilder.add(new UnboundFacet(primaryKeyProperties));
primaryKeyProperties = null;
}
Optional<IdentityName> optionalIndexName = prop.getIndexName();
if (optionalIndexName.isPresent()) {
UnboundFacet facet = new UnboundFacet(prop);
facetsBuilder.add(facet);
}
}
}
if (primaryKeyProperties != null && primaryKeyProperties.size() > 0) {
facetsBuilder.add(new UnboundFacet(primaryKeyProperties));
}
this.facets = facetsBuilder.build();
}
@Override
@ -138,6 +168,11 @@ public final class HelenusMappingEntity implements HelenusEntity {
return props.get(name);
}
@Override
public List<Facet> getFacets() {
return facets;
}
@Override
public IdentityName getName() {
return name;

View file

@ -26,7 +26,7 @@ public enum BeanColumnValueProvider implements ColumnValueProvider {
INSTANCE;
@Override
public <V> V getColumnValue(Object bean, int columnIndexUnused, HelenusProperty property) {
public <V> V getColumnValue(Object bean, int columnIndexUnused, HelenusProperty property, boolean immutable) {
Method getter = property.getGetterMethod();

View file

@ -23,7 +23,11 @@ import net.helenus.mapping.HelenusProperty;
public interface ColumnValueProvider {
<V> V getColumnValue(Object source, int columnIndex, HelenusProperty property);
<V> V getColumnValue(Object source, int columnIndex, HelenusProperty property, boolean immutable);
default <V> V getColumnValue(Object source, int columnIndex, HelenusProperty property) {
return getColumnValue(source, columnIndex, property, false);
}
default <T> TypeCodec<T> codecFor(DataType type) {
return CodecRegistry.DEFAULT_INSTANCE.codecFor(type);

View file

@ -17,13 +17,18 @@ package net.helenus.mapping.value;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.Row;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import net.helenus.core.SessionRepository;
import net.helenus.mapping.HelenusProperty;
@ -37,15 +42,15 @@ public final class RowColumnValueProvider implements ColumnValueProvider {
}
@Override
public <V> V getColumnValue(Object sourceObj, int columnIndex, HelenusProperty property) {
public <V> V getColumnValue(Object sourceObj, int columnIndex, HelenusProperty property, boolean immutable) {
Row source = (Row) sourceObj;
Object value = null;
if (columnIndex != -1) {
value = readValueByIndex(source, columnIndex);
value = readValueByIndex(source, columnIndex, immutable);
} else {
value = readValueByName(source, property.getColumnName().getName());
value = readValueByName(source, property.getColumnName().getName(), immutable);
}
if (value != null) {
@ -60,7 +65,7 @@ public final class RowColumnValueProvider implements ColumnValueProvider {
return (V) value;
}
private Object readValueByIndex(Row source, int columnIndex) {
private Object readValueByIndex(Row source, int columnIndex, boolean immutable) {
if (source.isNull(columnIndex)) {
return null;
@ -76,12 +81,15 @@ public final class RowColumnValueProvider implements ColumnValueProvider {
switch (columnType.getName()) {
case SET :
return source.getSet(columnIndex, codecFor(typeArguments.get(0)).getJavaType());
Set set = source.getSet(columnIndex, codecFor(typeArguments.get(0)).getJavaType());
return immutable ? ImmutableSet.copyOf(set) : set;
case MAP :
return source.getMap(columnIndex, codecFor(typeArguments.get(0)).getJavaType(),
Map map = source.getMap(columnIndex, codecFor(typeArguments.get(0)).getJavaType(),
codecFor(typeArguments.get(1)).getJavaType());
return immutable ? ImmutableMap.copyOf(map) : map;
case LIST :
return source.getList(columnIndex, codecFor(typeArguments.get(0)).getJavaType());
List list = source.getList(columnIndex, codecFor(typeArguments.get(0)).getJavaType());
return immutable ? ImmutableList.copyOf(list) : list;
}
}
@ -91,7 +99,7 @@ public final class RowColumnValueProvider implements ColumnValueProvider {
return value;
}
private Object readValueByName(Row source, String columnName) {
private Object readValueByName(Row source, String columnName, boolean immutable) {
if (source.isNull(columnName)) {
return null;
@ -107,12 +115,15 @@ public final class RowColumnValueProvider implements ColumnValueProvider {
switch (columnType.getName()) {
case SET :
return source.getSet(columnName, codecFor(typeArguments.get(0)).getJavaType());
Set set = source.getSet(columnName, codecFor(typeArguments.get(0)).getJavaType());
return immutable ? ImmutableSet.copyOf(set) : set;
case MAP :
return source.getMap(columnName, codecFor(typeArguments.get(0)).getJavaType(),
Map map = source.getMap(columnName, codecFor(typeArguments.get(0)).getJavaType(),
codecFor(typeArguments.get(1)).getJavaType());
return immutable ? ImmutableMap.copyOf(map) : map;
case LIST :
return source.getList(columnName, codecFor(typeArguments.get(0)).getJavaType());
List list = source.getList(columnName, codecFor(typeArguments.get(0)).getJavaType());
return immutable ? ImmutableList.copyOf(list) : list;
}
}

View file

@ -36,7 +36,7 @@ public final class TupleColumnValueProvider implements ColumnValueProvider {
}
@Override
public <V> V getColumnValue(Object sourceObj, int columnIndexUnused, HelenusProperty property) {
public <V> V getColumnValue(Object sourceObj, int columnIndexUnused, HelenusProperty property, boolean immutable) {
int columnIndex = property.getOrdinal();

View file

@ -37,7 +37,7 @@ public final class UDTColumnValueProvider implements ColumnValueProvider {
@Override
@SuppressWarnings("unchecked")
public <V> V getColumnValue(Object sourceObj, int columnIndexUnused, HelenusProperty property) {
public <V> V getColumnValue(Object sourceObj, int columnIndexUnused, HelenusProperty property, boolean immutable) {
UDTValue source = (UDTValue) sourceObj;

View file

@ -20,6 +20,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import net.helenus.core.reflect.Drafted;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusProperty;
import net.helenus.support.HelenusMappingException;
@ -29,11 +30,13 @@ public final class ValueProviderMap implements Map<String, Object> {
private final Object source;
private final ColumnValueProvider valueProvider;
private final HelenusEntity entity;
private final boolean immutable;
public ValueProviderMap(Object source, ColumnValueProvider valueProvider, HelenusEntity entity) {
this.source = source;
this.valueProvider = valueProvider;
this.entity = entity;
this.immutable = entity.getMappingInterface().isAssignableFrom(Drafted.class);
}
@Override
@ -42,7 +45,7 @@ public final class ValueProviderMap implements Map<String, Object> {
String name = (String) key;
HelenusProperty prop = entity.getProperty(name);
if (prop != null) {
return valueProvider.getColumnValue(source, -1, prop);
return valueProvider.getColumnValue(source, -1, prop, immutable);
}
}
return null;
@ -121,4 +124,21 @@ public final class ValueProviderMap implements Map<String, Object> {
public String toString() {
return source.toString();
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || (!o.getClass().isAssignableFrom(Map.class) && getClass() != o.getClass()))
return false;
Map that = (Map) o;
if (this.size() != that.size())
return false;
for (String key : this.keySet())
if (!this.get(key).equals(that.get(key)))
return false;
return true;
}
}

View file

@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;

View file

@ -17,6 +17,7 @@ package net.helenus.test.integration.core.counter;
import static net.helenus.core.Query.eq;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
@ -24,8 +25,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.concurrent.TimeoutException;
public class CounterTest extends AbstractEmbeddedCassandraTest {
static Page page;

View file

@ -5,7 +5,6 @@ import static net.helenus.core.Query.eq;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;

View file

@ -15,6 +15,7 @@
*/
package net.helenus.test.integration.core.index;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.Query;
@ -23,8 +24,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeoutException;
public class SecondaryIndexTest extends AbstractEmbeddedCassandraTest {
Book book;

View file

@ -20,7 +20,6 @@ import static net.helenus.core.Query.eq;
import com.datastax.driver.core.ResultSet;
import java.util.*;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.Operator;

View file

@ -15,6 +15,7 @@
*/
package net.helenus.test.integration.core.tuple;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.Query;
@ -23,8 +24,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.concurrent.TimeoutException;
public class InnerTupleTest extends AbstractEmbeddedCassandraTest {
static PhotoAlbum photoAlbum;

View file

@ -20,6 +20,7 @@ import static net.helenus.core.Query.eq;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.TupleType;
import com.datastax.driver.core.TupleValue;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
@ -27,8 +28,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.concurrent.TimeoutException;
public class TupleTest extends AbstractEmbeddedCassandraTest {
static Album album;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.tuplecollection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.tuplecollection;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.tuplecollection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.tuplecollection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.tuplecollection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -21,7 +21,6 @@ import static net.helenus.core.Query.get;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.udtcollection;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.udtcollection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.udtcollection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -21,7 +21,6 @@ import static net.helenus.core.Query.get;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Test;

View file

@ -25,6 +25,7 @@ import net.helenus.core.HelenusSession;
import net.helenus.core.UnitOfWork;
import net.helenus.core.annotation.Cacheable;
import net.helenus.mapping.annotation.Column;
import net.helenus.mapping.annotation.Index;
import net.helenus.mapping.annotation.PartitionKey;
import net.helenus.mapping.annotation.Table;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
@ -39,6 +40,7 @@ interface Widget {
UUID id();
@Column
@Index
String name();
}
@ -141,6 +143,46 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
});
}
}
@Test
public void testSelectViaIndexAfterSelect() throws Exception {
Widget w1, w2;
UUID key = UUIDs.timeBased();
try (UnitOfWork uow = session.begin()) {
// This should insert and cache Widget in the uow.
session
.<Widget>insert(widget)
.value(widget::id, key)
.value(widget::name, RandomString.make(20))
.sync(uow);
// This should read from the database and return a Widget.
w1 =
session
.<Widget>select(widget)
.where(widget::id, eq(key))
.single()
.sync(uow)
.orElse(null);
// This should read from the cache and get the same instance of a Widget.
w2 =
session
.<Widget>select(widget)
.where(widget::name, eq(w1.name()))
.single()
.sync(uow)
.orElse(null);
uow.commit()
.andThen(
() -> {
Assert.assertEquals(w1, w2);
});
}
}
/*
@Test
public void testSelectAfterInsertProperlyCachesEntity() throws Exception {

View file

@ -19,7 +19,6 @@ import com.google.common.collect.Sets;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.Query;

View file

@ -19,7 +19,6 @@ import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.Query;

View file

@ -21,7 +21,6 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
@ -60,15 +59,15 @@ public class MaterializedViewTest extends AbstractEmbeddedCassandraTest {
try {
session
.insert(cyclist)
.value(cyclist::cid, UUID.randomUUID())
.value(cyclist::age, 18)
.value(cyclist::birthday, dateFromString("1997-02-08"))
.value(cyclist::country, "Netherlands")
.value(cyclist::name, "Pascal EENKHOORN")
.sync();
.insert(cyclist)
.value(cyclist::cid, UUID.randomUUID())
.value(cyclist::age, 18)
.value(cyclist::birthday, dateFromString("1997-02-08"))
.value(cyclist::country, "Netherlands")
.value(cyclist::name, "Pascal EENKHOORN")
.sync();
} catch (TimeoutException e) {
}
catch (TimeoutException e) {}
}
@Test