Fixing a few corners of caching when using drafted entity objects. Working out kinks in merge logic for entity instances in UOWs.
This commit is contained in:
parent
6ff188f241
commit
d19a9c741d
16 changed files with 448 additions and 186 deletions
|
@ -32,6 +32,8 @@ import net.helenus.core.cache.CacheUtil;
|
|||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.core.operation.AbstractOperation;
|
||||
import net.helenus.core.operation.BatchOperation;
|
||||
import net.helenus.core.reflect.Drafted;
|
||||
import net.helenus.mapping.MappingUtil;
|
||||
import net.helenus.support.Either;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -209,16 +211,33 @@ public abstract class AbstractUnitOfWork<E extends Exception>
|
|||
if (eitherValue.isLeft()) {
|
||||
value = eitherValue.getLeft();
|
||||
}
|
||||
result = Optional.of(value);
|
||||
break;
|
||||
return Optional.of(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!result.isPresent()) {
|
||||
|
||||
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
|
||||
if (parent != null) {
|
||||
return parent.cacheLookup(facets);
|
||||
result = checkParentCache(facets);
|
||||
if (result.isPresent()) {
|
||||
Object r = result.get();
|
||||
try {
|
||||
Class<?> iface = MappingUtil.getMappingInterface(r);
|
||||
if (Drafted.class.isAssignableFrom(iface)) {
|
||||
cacheUpdate(r, facets);
|
||||
} else {
|
||||
cacheUpdate(MappingUtil.clone(r), facets);
|
||||
}
|
||||
} catch (CloneNotSupportedException e) {
|
||||
result = Optional.empty();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private Optional<Object> checkParentCache(List<Facet> facets) {
|
||||
Optional<Object> result = Optional.empty();
|
||||
if (parent != null) {
|
||||
result = parent.checkParentCache(facets);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -259,16 +278,20 @@ public abstract class AbstractUnitOfWork<E extends Exception>
|
|||
}
|
||||
|
||||
@Override
|
||||
public void cacheUpdate(Object value, List<Facet> facets) {
|
||||
public Object cacheUpdate(Object value, List<Facet> facets) {
|
||||
Object result = null;
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
for (Facet facet : facets) {
|
||||
if (!facet.fixed()) {
|
||||
if (facet.alone()) {
|
||||
String columnName = facet.name() + "==" + facet.value();
|
||||
if (result == null)
|
||||
result = cache.get(tableName, columnName);
|
||||
cache.put(tableName, columnName, Either.left(value));
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public void batch(AbstractOperation s) {
|
||||
|
@ -395,16 +418,10 @@ public abstract class AbstractUnitOfWork<E extends Exception>
|
|||
private void mergeCache(Table<String, String, Either<Object, List<Facet>>> from) {
|
||||
Table<String, String, Either<Object, List<Facet>>> to = this.cache;
|
||||
from.rowMap()
|
||||
.forEach(
|
||||
(rowKey, columnMap) -> {
|
||||
columnMap.forEach(
|
||||
(columnKey, value) -> {
|
||||
.forEach((rowKey, columnMap) -> {
|
||||
columnMap.forEach((columnKey, value) -> {
|
||||
if (to.contains(rowKey, columnKey)) {
|
||||
// TODO(gburd): merge case, preserve object identity
|
||||
to.put(
|
||||
rowKey,
|
||||
columnKey,
|
||||
Either.left(
|
||||
to.put(rowKey, columnKey, Either.left(
|
||||
CacheUtil.merge(
|
||||
to.get(rowKey, columnKey).getLeft(),
|
||||
from.get(rowKey, columnKey).getLeft())));
|
||||
|
|
|
@ -256,8 +256,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
|
|||
.collect(Collectors.toList());
|
||||
for (Object pojo : items) {
|
||||
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo));
|
||||
Map<String, Object> valueMap =
|
||||
pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
|
||||
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
|
||||
if (entity.isCacheable()) {
|
||||
List<Facet> boundFacets = new ArrayList<>();
|
||||
for (Facet facet : entity.getFacets()) {
|
||||
|
@ -266,11 +265,9 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
|
|||
UnboundFacet.Binder binder = unboundFacet.binder();
|
||||
unboundFacet
|
||||
.getProperties()
|
||||
.forEach(
|
||||
prop -> {
|
||||
.forEach(prop -> {
|
||||
if (valueMap == null) {
|
||||
Object value =
|
||||
BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop);
|
||||
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop);
|
||||
binder.setValueForProperty(prop, value.toString());
|
||||
} else {
|
||||
Object v = valueMap.get(prop.getPropertyName());
|
||||
|
@ -393,9 +390,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
|
|||
HelenusEntity entity = Helenus.resolve(pojo);
|
||||
Class<?> entityClass = entity.getMappingInterface();
|
||||
|
||||
return new SelectOperation<E>(
|
||||
this,
|
||||
entity,
|
||||
return new SelectOperation<E>(this, entity,
|
||||
(r) -> {
|
||||
Map<String, Object> map = new ValueProviderMap(r, valueProvider, entity);
|
||||
return (E) Helenus.map(entityClass, map);
|
||||
|
@ -407,9 +402,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
|
|||
ColumnValueProvider valueProvider = getValueProvider();
|
||||
HelenusEntity entity = Helenus.entity(entityClass);
|
||||
|
||||
return new SelectOperation<E>(
|
||||
this,
|
||||
entity,
|
||||
return new SelectOperation<E>(this, entity,
|
||||
(r) -> {
|
||||
Map<String, Object> map = new ValueProviderMap(r, valueProvider, entity);
|
||||
return (E) Helenus.map(entityClass, map);
|
||||
|
@ -420,14 +413,19 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
|
|||
return new SelectOperation<Fun.ArrayTuple>(this);
|
||||
}
|
||||
|
||||
public SelectOperation<Row> selectAll(Class<?> entityClass) {
|
||||
public <E> SelectOperation<E> selectAll(Class<E> entityClass) {
|
||||
Objects.requireNonNull(entityClass, "entityClass is empty");
|
||||
return new SelectOperation<Row>(this, Helenus.entity(entityClass));
|
||||
HelenusEntity entity = Helenus.entity(entityClass);
|
||||
|
||||
return new SelectOperation<E>(this, entity,
|
||||
(r) -> {
|
||||
Map<String, Object> map = new ValueProviderMap(r, valueProvider, entity);
|
||||
return (E) Helenus.map(entityClass, map);
|
||||
});
|
||||
}
|
||||
|
||||
public <E> SelectOperation<Row> selectAll(E pojo) {
|
||||
Objects.requireNonNull(
|
||||
pojo, "supplied object must be a dsl for a registered entity but cannot be null");
|
||||
Objects.requireNonNull(pojo, "supplied object must be a dsl for a registered entity but cannot be null");
|
||||
HelenusEntity entity = Helenus.resolve(pojo);
|
||||
return new SelectOperation<Row>(this, entity);
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ public interface UnitOfWork<X extends Exception> extends AutoCloseable {
|
|||
|
||||
Optional<Object> cacheLookup(List<Facet> facets);
|
||||
|
||||
void cacheUpdate(Object pojo, List<Facet> facets);
|
||||
Object cacheUpdate(Object pojo, List<Facet> facets);
|
||||
|
||||
List<Facet> cacheEvict(List<Facet> facets);
|
||||
|
||||
|
|
134
src/main/java/net/helenus/core/cache/CacheUtil.java
vendored
134
src/main/java/net/helenus/core/cache/CacheUtil.java
vendored
|
@ -1,8 +1,18 @@
|
|||
package net.helenus.core.cache;
|
||||
|
||||
import net.helenus.core.Helenus;
|
||||
import net.helenus.core.reflect.Entity;
|
||||
import net.helenus.core.reflect.MapExportable;
|
||||
import net.helenus.mapping.HelenusEntity;
|
||||
import net.helenus.mapping.HelenusProperty;
|
||||
import net.helenus.mapping.MappingUtil;
|
||||
import net.helenus.mapping.value.BeanColumnValueProvider;
|
||||
import net.helenus.support.HelenusException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class CacheUtil {
|
||||
|
@ -83,28 +93,98 @@ public class CacheUtil {
|
|||
|
||||
/**
|
||||
* Merge changed values in the map behind `from` into `to`.
|
||||
*
|
||||
* @param to
|
||||
* @param from
|
||||
* @return
|
||||
*/
|
||||
public static Object merge(Object to, Object from) {
|
||||
if (to == from) {
|
||||
return to;
|
||||
} else {
|
||||
return from;
|
||||
public static Object merge(Object t, Object f) {
|
||||
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(t));
|
||||
|
||||
if (t == f) return t;
|
||||
if (f == null) return t;
|
||||
if (t == null) return f;
|
||||
|
||||
if (t instanceof MapExportable && t instanceof Entity && f instanceof MapExportable && f instanceof Entity) {
|
||||
Entity to = (Entity) t;
|
||||
Entity from = (Entity) f;
|
||||
Map<String, Object> toValueMap = ((MapExportable) to).toMap();
|
||||
Map<String, Object> fromValueMap = ((MapExportable) from).toMap();
|
||||
for (HelenusProperty prop : entity.getOrderedProperties()) {
|
||||
switch (prop.getColumnType()) {
|
||||
case PARTITION_KEY:
|
||||
case CLUSTERING_COLUMN:
|
||||
continue;
|
||||
default:
|
||||
Object toVal = BeanColumnValueProvider.INSTANCE.getColumnValue(to, -1, prop, false);
|
||||
Object fromVal = BeanColumnValueProvider.INSTANCE.getColumnValue(from, -1, prop, false);
|
||||
String ttlKey = ttlKey(prop);
|
||||
String writeTimeKey = writeTimeKey(prop);
|
||||
int[] toTtlI = (int[]) toValueMap.get(ttlKey);
|
||||
int toTtl = (toTtlI != null) ? toTtlI[0] : 0;
|
||||
Long toWriteTime = (Long) toValueMap.get(writeTimeKey);
|
||||
int[] fromTtlI = (int[]) fromValueMap.get(ttlKey);
|
||||
int fromTtl = (fromTtlI != null) ? fromTtlI[0] : 0;
|
||||
Long fromWriteTime = (Long) fromValueMap.get(writeTimeKey);
|
||||
|
||||
if (toVal != null) {
|
||||
if (fromVal != null) {
|
||||
if (toVal == fromVal) {
|
||||
// Case: object identity
|
||||
// Goal: ensure write time and ttl are also in sync
|
||||
if (fromWriteTime != null && fromWriteTime != 0L &&
|
||||
(toWriteTime == null || fromWriteTime > toWriteTime)) {
|
||||
((MapExportable) to).put(writeTimeKey, fromWriteTime);
|
||||
}
|
||||
/*
|
||||
* // TODO(gburd): take ttl and writeTime into account when merging. Map<String,
|
||||
* Object> toValueMap = to instanceof MapExportable ? ((MapExportable)
|
||||
* to).toMap() : null; Map<String, Object> fromValueMap = to instanceof
|
||||
* MapExportable ? ((MapExportable) from).toMap() : null;
|
||||
*
|
||||
* if (toValueMap != null && fromValueMap != null) { for (String key :
|
||||
* fromValueMap.keySet()) { if (toValueMap.containsKey(key) &&
|
||||
* toValueMap.get(key) != fromValueMap.get(key)) { toValueMap.put(key,
|
||||
* fromValueMap.get(key)); } } } return to;
|
||||
*/
|
||||
if (fromTtl > 0 && fromTtl > toTtl) {
|
||||
((MapExportable) to).put(ttlKey, fromTtl);
|
||||
}
|
||||
} else if (fromWriteTime != null && fromWriteTime != 0L) {
|
||||
// Case: to exists and from exists
|
||||
// Goal: copy over from -> to iff from.writeTime > to.writeTime
|
||||
if (toWriteTime != null && toWriteTime != 0L) {
|
||||
if (fromWriteTime > toWriteTime) {
|
||||
((MapExportable) to).put(prop.getPropertyName(), fromVal);
|
||||
((MapExportable) to).put(writeTimeKey, fromWriteTime);
|
||||
if (fromTtl > 0) {
|
||||
((MapExportable) to).put(ttlKey, fromTtl);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
((MapExportable) to).put(prop.getPropertyName(), fromVal);
|
||||
((MapExportable) to).put(writeTimeKey, fromWriteTime);
|
||||
if (fromTtl > 0) {
|
||||
((MapExportable) to).put(ttlKey, fromTtl);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (toWriteTime == null || toWriteTime == 0L) {
|
||||
// Caution, entering grey area...
|
||||
if (!toVal.equals(fromVal)) {
|
||||
// dangerous waters here, values diverge without information that enables resolution,
|
||||
// policy (for now) is to move value from -> to anyway.
|
||||
((MapExportable) to).put(prop.getPropertyName(), fromVal);
|
||||
if (fromTtl > 0) {
|
||||
((MapExportable) to).put(ttlKey, fromTtl);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Case: from exists, but to doesn't (it's null)
|
||||
// Goal: copy over from -> to, include ttl and writeTime if present
|
||||
if (fromVal != null) {
|
||||
((MapExportable) to).put(prop.getPropertyName(), fromVal);
|
||||
if (fromWriteTime != null && fromWriteTime != 0L) {
|
||||
((MapExportable) to).put(writeTimeKey, fromWriteTime);
|
||||
}
|
||||
if (fromTtl > 0) {
|
||||
((MapExportable) to).put(ttlKey, fromTtl);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return to;
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
public static String schemaName(List<Facet> facets) {
|
||||
|
@ -115,9 +195,17 @@ public class CacheUtil {
|
|||
.collect(Collectors.joining("."));
|
||||
}
|
||||
|
||||
public static String writeTimeKey(String propertyName) {
|
||||
return "_" + propertyName + "_writeTime";
|
||||
public static String writeTimeKey(HelenusProperty prop) {
|
||||
return writeTimeKey(prop.getColumnName().toCql(false));
|
||||
}
|
||||
|
||||
public static String ttlKey(String propertyName) { return "_" + propertyName + "_ttl"; }
|
||||
public static String ttlKey(HelenusProperty prop) {
|
||||
return ttlKey(prop.getColumnName().toCql(false));
|
||||
}
|
||||
|
||||
public static String writeTimeKey(String columnName) {
|
||||
return "_" + columnName + "_writeTime";
|
||||
}
|
||||
|
||||
public static String ttlKey(String columnName) { return "_" + columnName + "_ttl"; }
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterS
|
|||
|
||||
public <V> O where(Getter<V> getter, Operator operator, V val) {
|
||||
|
||||
if (val != null)
|
||||
addFilter(Filter.create(getter, operator, val));
|
||||
|
||||
return (O) this;
|
||||
|
@ -62,6 +63,7 @@ public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterS
|
|||
|
||||
public <V> O and(Getter<V> getter, Operator operator, V val) {
|
||||
|
||||
if (val != null)
|
||||
addFilter(Filter.create(getter, operator, val));
|
||||
|
||||
return (O) this;
|
||||
|
@ -83,6 +85,7 @@ public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterS
|
|||
|
||||
public <V> O onlyIf(Getter<V> getter, Operator operator, V val) {
|
||||
|
||||
if (val != null)
|
||||
addIfFilter(Filter.create(getter, operator, val));
|
||||
|
||||
return (O) this;
|
||||
|
|
|
@ -33,6 +33,8 @@ import net.helenus.core.AbstractSessionOperations;
|
|||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.core.cache.CacheUtil;
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.core.reflect.Drafted;
|
||||
import net.helenus.mapping.MappingUtil;
|
||||
import net.helenus.support.Fun;
|
||||
|
||||
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
|
||||
|
@ -69,6 +71,8 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
|
||||
if (updateCache) {
|
||||
List<Facet> facets = bindFacetValues();
|
||||
if (facets != null && facets.size() > 0) {
|
||||
if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) {
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
cacheResult = (E) sessionOps.checkCache(tableName, facets);
|
||||
if (cacheResult != null) {
|
||||
|
@ -81,6 +85,10 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
cacheMiss.mark();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
//TODO(gburd): look in statement cache for results
|
||||
}
|
||||
}
|
||||
|
||||
if (!result.isPresent()) {
|
||||
// Formulate the query and execute it against the Cassandra cluster.
|
||||
|
@ -128,7 +136,8 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
Stopwatch timer = Stopwatch.createStarted();
|
||||
try {
|
||||
List<Facet> facets = bindFacetValues();
|
||||
if (facets != null) {
|
||||
if (facets != null && facets.size() > 0) {
|
||||
if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) {
|
||||
cachedResult = checkCache(uow, facets);
|
||||
if (cachedResult != null) {
|
||||
updateCache = false;
|
||||
|
@ -137,23 +146,49 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
cacheHits.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(1, 0);
|
||||
} else {
|
||||
updateCache = true;
|
||||
uowCacheMiss.mark();
|
||||
if (isSessionCacheable()) {
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
cachedResult = (E) sessionOps.checkCache(tableName, facets);
|
||||
Class<?> iface = MappingUtil.getMappingInterface(cachedResult);
|
||||
if (cachedResult != null) {
|
||||
try {
|
||||
if (Drafted.class.isAssignableFrom(iface)) {
|
||||
result = Optional.of(cachedResult);
|
||||
} else {
|
||||
result = Optional.of(MappingUtil.clone(cachedResult));
|
||||
}
|
||||
sessionCacheHits.mark();
|
||||
cacheHits.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(1, 0);
|
||||
} catch (CloneNotSupportedException e) {
|
||||
result = Optional.empty();
|
||||
sessionCacheMiss.mark();
|
||||
cacheMiss.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(-1, 0);
|
||||
} finally {
|
||||
if (result.isPresent()) {
|
||||
updateCache = true;
|
||||
} else {
|
||||
updateCache = false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
updateCache = false;
|
||||
sessionCacheMiss.mark();
|
||||
cacheMiss.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(-1, 0);
|
||||
}
|
||||
} else {
|
||||
updateCache = false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
//TODO(gburd): look in statement cache for results
|
||||
cacheMiss.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(-1, 0);
|
||||
updateCache = false; //true;
|
||||
}
|
||||
} else {
|
||||
updateCache = false;
|
||||
}
|
||||
|
@ -175,15 +210,8 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
} else {
|
||||
|
||||
// Formulate the query and execute it against the Cassandra cluster.
|
||||
ResultSet resultSet =
|
||||
execute(
|
||||
sessionOps,
|
||||
uow,
|
||||
traceContext,
|
||||
queryExecutionTimeout,
|
||||
queryTimeoutUnits,
|
||||
showValues,
|
||||
true);
|
||||
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
|
||||
showValues, true);
|
||||
|
||||
// Transform the query result set into the desired shape.
|
||||
result = transform(resultSet);
|
||||
|
|
|
@ -326,10 +326,9 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
|
|||
return result;
|
||||
}
|
||||
|
||||
protected void cacheUpdate(UnitOfWork<?> uow, E pojo, List<Facet> identifyingFacets) {
|
||||
protected Object cacheUpdate(UnitOfWork<?> uow, E pojo, List<Facet> identifyingFacets) {
|
||||
List<Facet> facets = new ArrayList<>();
|
||||
Map<String, Object> valueMap =
|
||||
pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
|
||||
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
|
||||
|
||||
for (Facet facet : identifyingFacets) {
|
||||
if (facet instanceof UnboundFacet) {
|
||||
|
@ -358,6 +357,6 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
|
|||
}
|
||||
|
||||
// Cache the value (pojo), the statement key, and the fully bound facets.
|
||||
uow.cacheUpdate(pojo, facets);
|
||||
return uow.cacheUpdate(pojo, facets);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,8 @@ import net.helenus.core.AbstractSessionOperations;
|
|||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.core.cache.CacheUtil;
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.core.reflect.Drafted;
|
||||
import net.helenus.mapping.MappingUtil;
|
||||
import net.helenus.support.Fun;
|
||||
|
||||
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
|
||||
|
@ -70,6 +72,8 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
|
||||
if (!ignoreCache() && isSessionCacheable()) {
|
||||
List<Facet> facets = bindFacetValues();
|
||||
if (facets != null && facets.size() > 0) {
|
||||
if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) {
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
cacheResult = (E) sessionOps.checkCache(tableName, facets);
|
||||
if (cacheResult != null) {
|
||||
|
@ -81,6 +85,10 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
sessionCacheMiss.mark();
|
||||
cacheMiss.mark();
|
||||
}
|
||||
} else {
|
||||
//TODO(gburd): look in statement cache for results
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (resultStream == null) {
|
||||
|
@ -105,7 +113,8 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
List<E> again = new ArrayList<>();
|
||||
resultStream.forEach(
|
||||
result -> {
|
||||
if (!(result instanceof Fun)) {
|
||||
Class<?> resultClass = result.getClass();
|
||||
if (!(resultClass.getEnclosingClass() != null && resultClass.getEnclosingClass() == Fun.class)) {
|
||||
sessionOps.updateCache(result, facets);
|
||||
}
|
||||
again.add(result);
|
||||
|
@ -133,7 +142,8 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
Stopwatch timer = Stopwatch.createStarted();
|
||||
try {
|
||||
List<Facet> facets = bindFacetValues();
|
||||
if (facets != null) {
|
||||
if (facets != null && facets.size() > 0) {
|
||||
if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) {
|
||||
cachedResult = checkCache(uow, facets);
|
||||
if (cachedResult != null) {
|
||||
updateCache = false;
|
||||
|
@ -142,23 +152,50 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
cacheHits.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(1, 0);
|
||||
} else {
|
||||
updateCache = true;
|
||||
uowCacheMiss.mark();
|
||||
if (isSessionCacheable()) {
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
cachedResult = (E) sessionOps.checkCache(tableName, facets);
|
||||
Class<?> iface = MappingUtil.getMappingInterface(cachedResult);
|
||||
if (cachedResult != null) {
|
||||
resultStream = Stream.of(cachedResult);
|
||||
E result = null;
|
||||
try {
|
||||
if (Drafted.class.isAssignableFrom(iface)) {
|
||||
result = cachedResult;
|
||||
} else {
|
||||
result = MappingUtil.clone(cachedResult);
|
||||
}
|
||||
resultStream = Stream.of(result);
|
||||
sessionCacheHits.mark();
|
||||
cacheHits.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(1, 0);
|
||||
} catch (CloneNotSupportedException e) {
|
||||
resultStream = null;
|
||||
sessionCacheMiss.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(-1, 0);
|
||||
} finally {
|
||||
if (result != null) {
|
||||
updateCache = true;
|
||||
} else {
|
||||
updateCache = false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
updateCache = false;
|
||||
sessionCacheMiss.mark();
|
||||
cacheMiss.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(-1, 0);
|
||||
}
|
||||
} else {
|
||||
updateCache = false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
//TODO(gburd): look in statement cache for results
|
||||
updateCache = false; //true;
|
||||
cacheMiss.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(-1, 0);
|
||||
}
|
||||
} else {
|
||||
updateCache = false;
|
||||
}
|
||||
|
@ -172,15 +209,8 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
|
||||
// Check to see if we fetched the object from the cache
|
||||
if (resultStream == null) {
|
||||
ResultSet resultSet =
|
||||
execute(
|
||||
sessionOps,
|
||||
uow,
|
||||
traceContext,
|
||||
queryExecutionTimeout,
|
||||
queryTimeoutUnits,
|
||||
showValues,
|
||||
true);
|
||||
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
|
||||
showValues, true);
|
||||
resultStream = transform(resultSet);
|
||||
}
|
||||
|
||||
|
@ -196,7 +226,7 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
if (result != deleted
|
||||
&& !(resultClass.getEnclosingClass() != null
|
||||
&& resultClass.getEnclosingClass() == Fun.class)) {
|
||||
cacheUpdate(uow, result, facets);
|
||||
result = (E) cacheUpdate(uow, result, facets);
|
||||
}
|
||||
again.add(result);
|
||||
});
|
||||
|
|
|
@ -265,7 +265,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
|
||||
protected void adjustTtlAndWriteTime(MapExportable pojo) {
|
||||
if (ttl != null || writeTime != 0L) {
|
||||
List<String> propertyNames = values.stream()
|
||||
List<String> columnNames = values.stream()
|
||||
.map(t -> t._1.getProperty())
|
||||
.filter(prop -> {
|
||||
switch (prop.getColumnType()) {
|
||||
|
@ -279,12 +279,12 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
.map(prop -> prop.getColumnName().toCql(false))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (propertyNames.size() > 0) {
|
||||
if (columnNames.size() > 0) {
|
||||
if (ttl != null) {
|
||||
propertyNames.forEach(name -> pojo.put(CacheUtil.ttlKey(name), ttl));
|
||||
columnNames.forEach(name -> pojo.put(CacheUtil.ttlKey(name), ttl));
|
||||
}
|
||||
if (writeTime != 0L) {
|
||||
propertyNames.forEach(name -> pojo.put(CacheUtil.writeTimeKey(name), writeTime));
|
||||
columnNames.forEach(name -> pojo.put(CacheUtil.writeTimeKey(name), writeTime));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
*/
|
||||
package net.helenus.core.reflect;
|
||||
|
||||
import net.helenus.core.Getter;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public interface MapExportable {
|
||||
|
@ -24,4 +26,6 @@ public interface MapExportable {
|
|||
Map<String, Object> toMap();
|
||||
default Map<String, Object> toMap(boolean mutable) { return null; }
|
||||
default void put(String key, Object value) { }
|
||||
default <T> void put(Getter<T> getter, T value) { }
|
||||
|
||||
}
|
||||
|
|
|
@ -107,12 +107,21 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
|
|||
}
|
||||
|
||||
if (MapExportable.PUT_METHOD.equals(methodName) && method.getParameterCount() == 2) {
|
||||
final String key = (String)args[0];
|
||||
final String key;
|
||||
if (args[0] instanceof String) {
|
||||
key = (String) args[0];
|
||||
} else if (args[0] instanceof Getter) {
|
||||
key = MappingUtil.resolveMappingProperty((Getter)args[0]).getProperty().getPropertyName();
|
||||
} else {
|
||||
key = null;
|
||||
}
|
||||
if (key != null) {
|
||||
final Object value = (Object) args[1];
|
||||
if (src instanceof ValueProviderMap) {
|
||||
this.src = fromValueProviderMap(src);
|
||||
}
|
||||
src.put(key, value);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,9 +21,11 @@ import java.io.*;
|
|||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import net.helenus.core.Helenus;
|
||||
import net.helenus.core.HelenusSession;
|
||||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -34,6 +36,8 @@ public class EntityDraftBuilderTest extends AbstractEmbeddedCassandraTest {
|
|||
static Supply supply;
|
||||
static HelenusSession session;
|
||||
static Supply.Draft draft = null;
|
||||
static UUID id = null;
|
||||
static String region = null;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeTest() throws TimeoutException {
|
||||
|
@ -68,25 +72,28 @@ public class EntityDraftBuilderTest extends AbstractEmbeddedCassandraTest {
|
|||
});
|
||||
|
||||
Supply s1 = session.<Supply>insert(draft).sync();
|
||||
id = s1.id();
|
||||
region = s1.region();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFoo() throws Exception {
|
||||
|
||||
Supply s1 =
|
||||
session
|
||||
Supply s1 = session
|
||||
.<Supply>select(Supply.class)
|
||||
.where(supply::id, eq(draft.id()))
|
||||
.where(supply::id, eq(id))
|
||||
.and(supply::region, eq(region))
|
||||
.single()
|
||||
.sync()
|
||||
.orElse(null);
|
||||
|
||||
// List
|
||||
Supply s2 =
|
||||
session
|
||||
Supply s2 = session
|
||||
.<Supply>update(s1.update())
|
||||
.and(supply::region, eq(region))
|
||||
.prepend(supply::suppliers, "Pignose Supply, LLC.")
|
||||
.sync();
|
||||
|
||||
Assert.assertEquals(s2.suppliers().get(0), "Pignose Supply, LLC.");
|
||||
|
||||
// Set
|
||||
|
@ -99,6 +106,58 @@ public class EntityDraftBuilderTest extends AbstractEmbeddedCassandraTest {
|
|||
Assert.assertEquals((long) s4.demand().get("NORAM"), 10L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDraftMergeInNestedUow() throws Exception {
|
||||
Supply s1, s2, s3, s4, s5;
|
||||
Supply.Draft d1;
|
||||
|
||||
s1 = session
|
||||
.<Supply>select(Supply.class)
|
||||
.where(supply::id, eq(id))
|
||||
.and(supply::region, eq(region))
|
||||
.single()
|
||||
.sync()
|
||||
.orElse(null);
|
||||
|
||||
try(UnitOfWork uow1 = session.begin()) {
|
||||
s2 = session
|
||||
.<Supply>select(Supply.class)
|
||||
.where(supply::id, eq(id))
|
||||
.and(supply::region, eq(region))
|
||||
.single()
|
||||
.sync(uow1)
|
||||
.orElse(null);
|
||||
|
||||
try(UnitOfWork uow2 = session.begin(uow1)) {
|
||||
s3 = session
|
||||
.<Supply>select(Supply.class)
|
||||
.where(supply::id, eq(id))
|
||||
.and(supply::region, eq(region))
|
||||
.single()
|
||||
.sync(uow2)
|
||||
.orElse(null);
|
||||
|
||||
d1 = s3.update()
|
||||
.setCode("WIDGET-002-UPDATED");
|
||||
|
||||
s4 = session.update(d1)
|
||||
.usingTtl(20)
|
||||
.defaultTimestamp(System.currentTimeMillis())
|
||||
.sync(uow2);
|
||||
|
||||
uow2.commit();
|
||||
}
|
||||
|
||||
s5 = session
|
||||
.<Supply>select(Supply.class)
|
||||
.where(supply::id, eq(id))
|
||||
.and(supply::region, eq(region))
|
||||
.single()
|
||||
.sync(uow1)
|
||||
.orElse(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerialization() throws Exception {
|
||||
Supply s1, s2;
|
||||
|
|
|
@ -4,11 +4,13 @@ import java.util.Map;
|
|||
import java.util.UUID;
|
||||
import net.helenus.core.AbstractAuditedEntityDraft;
|
||||
import net.helenus.core.Helenus;
|
||||
import net.helenus.core.reflect.Drafted;
|
||||
import net.helenus.core.reflect.Entity;
|
||||
import net.helenus.core.reflect.MapExportable;
|
||||
import net.helenus.mapping.annotation.*;
|
||||
|
||||
@Table
|
||||
public interface Inventory {
|
||||
public interface Inventory extends Entity, Drafted<Inventory> {
|
||||
|
||||
static Inventory inventory = Helenus.dsl(Inventory.class);
|
||||
|
||||
|
|
|
@ -7,11 +7,15 @@ import java.util.Set;
|
|||
import java.util.UUID;
|
||||
import net.helenus.core.AbstractEntityDraft;
|
||||
import net.helenus.core.Helenus;
|
||||
import net.helenus.core.annotation.Cacheable;
|
||||
import net.helenus.core.reflect.Drafted;
|
||||
import net.helenus.core.reflect.Entity;
|
||||
import net.helenus.core.reflect.MapExportable;
|
||||
import net.helenus.mapping.annotation.*;
|
||||
|
||||
@Table
|
||||
public interface Supply {
|
||||
@Cacheable
|
||||
public interface Supply extends Entity, Drafted<Supply> {
|
||||
|
||||
static Supply supply = Helenus.dsl(Supply.class);
|
||||
|
||||
|
|
|
@ -207,6 +207,20 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest {
|
|||
Assert.assertEquals(0L, cnt);
|
||||
}
|
||||
|
||||
public void testFunTuple() throws TimeoutException {
|
||||
Fun.Tuple1<String> tf = session
|
||||
.select(user::name)
|
||||
.where(user::id, eq(100L))
|
||||
.single()
|
||||
.sync()
|
||||
.orElse(null);
|
||||
if (tf != null) {
|
||||
Assert.assertEquals(Fun.class, tf.getClass().getEnclosingClass());
|
||||
String name = tf._1;
|
||||
Assert.assertEquals("greg", name);
|
||||
}
|
||||
}
|
||||
|
||||
public void testZipkin() throws TimeoutException {
|
||||
session
|
||||
.update()
|
||||
|
|
|
@ -28,6 +28,7 @@ import net.helenus.core.HelenusSession;
|
|||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.core.annotation.Cacheable;
|
||||
import net.helenus.core.reflect.Entity;
|
||||
import net.helenus.core.reflect.MapExportable;
|
||||
import net.helenus.mapping.annotation.Constraints;
|
||||
import net.helenus.mapping.annotation.Index;
|
||||
import net.helenus.mapping.annotation.PartitionKey;
|
||||
|
@ -125,14 +126,13 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
|||
|
||||
@Test
|
||||
public void testSelectAfterNestedSelect() throws Exception {
|
||||
Widget w1, w2, w3, w4;
|
||||
Widget w1, w1a, w2, w3, w4;
|
||||
UUID key1 = UUIDs.timeBased();
|
||||
UUID key2 = UUIDs.timeBased();
|
||||
|
||||
// This should inserted Widget, and not cache it in uow1.
|
||||
try (UnitOfWork uow1 = session.begin()) {
|
||||
w1 =
|
||||
session
|
||||
w1 = session
|
||||
.<Widget>insert(widget)
|
||||
.value(widget::id, key1)
|
||||
.value(widget::name, RandomString.make(20))
|
||||
|
@ -144,9 +144,18 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
|||
|
||||
try (UnitOfWork uow2 = session.begin(uow1)) {
|
||||
|
||||
// A "SELECT * FROM widget" query does not contain enough information to fetch an item from cache.
|
||||
// This will miss, until we implement a statement cache.
|
||||
w1a = session
|
||||
.<Widget>selectAll(Widget.class)
|
||||
.sync(uow2)
|
||||
.filter(w -> w.id().equals(key1))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
Assert.assertTrue(w1.equals(w1a));
|
||||
|
||||
// This should read from uow1's cache and return the same Widget.
|
||||
w2 =
|
||||
session
|
||||
w2 = session
|
||||
.<Widget>select(widget)
|
||||
.where(widget::id, eq(key1))
|
||||
.single()
|
||||
|
@ -155,8 +164,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
|||
|
||||
Assert.assertEquals(w1, w2);
|
||||
|
||||
w3 =
|
||||
session
|
||||
w3 = session
|
||||
.<Widget>insert(widget)
|
||||
.value(widget::id, key2)
|
||||
.value(widget::name, RandomString.make(20))
|
||||
|
@ -174,8 +182,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
|||
}
|
||||
|
||||
// This should read from the cache and get the same instance of a Widget.
|
||||
w4 =
|
||||
session
|
||||
w4 = session
|
||||
.<Widget>select(widget)
|
||||
.where(widget::a, eq(w3.a()))
|
||||
.and(widget::b, eq(w3.b()))
|
||||
|
|
Loading…
Reference in a new issue