Formatting.

This commit is contained in:
Greg Burd 2017-11-12 20:14:31 -05:00
parent d19a9c741d
commit c025dc35a7
37 changed files with 874 additions and 698 deletions

View file

@ -25,7 +25,6 @@ import java.io.PrintStream;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import net.helenus.core.cache.Facet; import net.helenus.core.cache.Facet;
import net.helenus.core.operation.Operation;
import net.helenus.mapping.value.ColumnValuePreparer; import net.helenus.mapping.value.ColumnValuePreparer;
import net.helenus.mapping.value.ColumnValueProvider; import net.helenus.mapping.value.ColumnValueProvider;
import net.helenus.support.Either; import net.helenus.support.Either;

View file

@ -17,8 +17,6 @@ package net.helenus.core;
import static net.helenus.core.HelenusSession.deleted; import static net.helenus.core.HelenusSession.deleted;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ResultSet;
import com.diffplug.common.base.Errors; import com.diffplug.common.base.Errors;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.HashBasedTable; import com.google.common.collect.HashBasedTable;
@ -223,9 +221,9 @@ public abstract class AbstractUnitOfWork<E extends Exception>
try { try {
Class<?> iface = MappingUtil.getMappingInterface(r); Class<?> iface = MappingUtil.getMappingInterface(r);
if (Drafted.class.isAssignableFrom(iface)) { if (Drafted.class.isAssignableFrom(iface)) {
cacheUpdate(r, facets); cacheUpdate(r, facets);
} else { } else {
cacheUpdate(MappingUtil.clone(r), facets); cacheUpdate(MappingUtil.clone(r), facets);
} }
} catch (CloneNotSupportedException e) { } catch (CloneNotSupportedException e) {
result = Optional.empty(); result = Optional.empty();
@ -235,11 +233,11 @@ public abstract class AbstractUnitOfWork<E extends Exception>
} }
private Optional<Object> checkParentCache(List<Facet> facets) { private Optional<Object> checkParentCache(List<Facet> facets) {
Optional<Object> result = Optional.empty(); Optional<Object> result = Optional.empty();
if (parent != null) { if (parent != null) {
result = parent.checkParentCache(facets); result = parent.checkParentCache(facets);
} }
return result; return result;
} }
@Override @Override
@ -285,8 +283,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
if (!facet.fixed()) { if (!facet.fixed()) {
if (facet.alone()) { if (facet.alone()) {
String columnName = facet.name() + "==" + facet.value(); String columnName = facet.name() + "==" + facet.value();
if (result == null) if (result == null) result = cache.get(tableName, columnName);
result = cache.get(tableName, columnName);
cache.put(tableName, columnName, Either.left(value)); cache.put(tableName, columnName, Either.left(value));
} }
} }
@ -314,8 +311,8 @@ public abstract class AbstractUnitOfWork<E extends Exception>
public PostCommitFunction<Void, Void> commit() throws E, TimeoutException { public PostCommitFunction<Void, Void> commit() throws E, TimeoutException {
if (batch != null) { if (batch != null) {
committedAt = batch.sync(this); committedAt = batch.sync(this);
//TODO(gburd) update cache with writeTime... //TODO(gburd) update cache with writeTime...
} }
// All nested UnitOfWork should be committed (not aborted) before calls to // All nested UnitOfWork should be committed (not aborted) before calls to
@ -387,14 +384,14 @@ public abstract class AbstractUnitOfWork<E extends Exception>
} }
private void addBatched(BatchOperation batch) { private void addBatched(BatchOperation batch) {
if (this.batch == null) { if (this.batch == null) {
this.batch = batch; this.batch = batch;
} else { } else {
this.batch.addAll(batch); this.batch.addAll(batch);
} }
} }
/* Explicitly discard the work and mark it as as such in the log. */ /* Explicitly discard the work and mark it as as such in the log. */
public synchronized void abort() { public synchronized void abort() {
TreeTraverser<AbstractUnitOfWork<E>> traverser = TreeTraverser<AbstractUnitOfWork<E>> traverser =
TreeTraverser.using(node -> node::getChildNodes); TreeTraverser.using(node -> node::getChildNodes);
@ -418,13 +415,18 @@ public abstract class AbstractUnitOfWork<E extends Exception>
private void mergeCache(Table<String, String, Either<Object, List<Facet>>> from) { private void mergeCache(Table<String, String, Either<Object, List<Facet>>> from) {
Table<String, String, Either<Object, List<Facet>>> to = this.cache; Table<String, String, Either<Object, List<Facet>>> to = this.cache;
from.rowMap() from.rowMap()
.forEach((rowKey, columnMap) -> { .forEach(
columnMap.forEach((columnKey, value) -> { (rowKey, columnMap) -> {
if (to.contains(rowKey, columnKey)) { columnMap.forEach(
to.put(rowKey, columnKey, Either.left( (columnKey, value) -> {
if (to.contains(rowKey, columnKey)) {
to.put(
rowKey,
columnKey,
Either.left(
CacheUtil.merge( CacheUtil.merge(
to.get(rowKey, columnKey).getLeft(), to.get(rowKey, columnKey).getLeft(),
from.get(rowKey, columnKey).getLeft()))); from.get(rowKey, columnKey).getLeft())));
} else { } else {
to.put(rowKey, columnKey, from.get(rowKey, columnKey)); to.put(rowKey, columnKey, from.get(rowKey, columnKey));
} }
@ -453,5 +455,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
return committed; return committed;
} }
public long committedAt() { return committedAt; } public long committedAt() {
return committedAt;
}
} }

View file

@ -79,7 +79,8 @@ public final class Filter<V> {
return new Filter<V>(node, postulate); return new Filter<V>(node, postulate);
} }
public static <V> Filter<V> create(Getter<V> getter, HelenusPropertyNode node, Postulate<V> postulate) { public static <V> Filter<V> create(
Getter<V> getter, HelenusPropertyNode node, Postulate<V> postulate) {
Objects.requireNonNull(getter, "empty getter"); Objects.requireNonNull(getter, "empty getter");
Objects.requireNonNull(postulate, "empty operator"); Objects.requireNonNull(postulate, "empty operator");
return new Filter<V>(node, postulate); return new Filter<V>(node, postulate);

View file

@ -256,7 +256,8 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
.collect(Collectors.toList()); .collect(Collectors.toList());
for (Object pojo : items) { for (Object pojo : items) {
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo)); HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo));
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; Map<String, Object> valueMap =
pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
if (entity.isCacheable()) { if (entity.isCacheable()) {
List<Facet> boundFacets = new ArrayList<>(); List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : entity.getFacets()) { for (Facet facet : entity.getFacets()) {
@ -265,9 +266,11 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
UnboundFacet.Binder binder = unboundFacet.binder(); UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet unboundFacet
.getProperties() .getProperties()
.forEach(prop -> { .forEach(
prop -> {
if (valueMap == null) { if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop); Object value =
BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop);
binder.setValueForProperty(prop, value.toString()); binder.setValueForProperty(prop, value.toString());
} else { } else {
Object v = valueMap.get(prop.getPropertyName()); Object v = valueMap.get(prop.getPropertyName());
@ -390,7 +393,9 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
HelenusEntity entity = Helenus.resolve(pojo); HelenusEntity entity = Helenus.resolve(pojo);
Class<?> entityClass = entity.getMappingInterface(); Class<?> entityClass = entity.getMappingInterface();
return new SelectOperation<E>(this, entity, return new SelectOperation<E>(
this,
entity,
(r) -> { (r) -> {
Map<String, Object> map = new ValueProviderMap(r, valueProvider, entity); Map<String, Object> map = new ValueProviderMap(r, valueProvider, entity);
return (E) Helenus.map(entityClass, map); return (E) Helenus.map(entityClass, map);
@ -402,7 +407,9 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
ColumnValueProvider valueProvider = getValueProvider(); ColumnValueProvider valueProvider = getValueProvider();
HelenusEntity entity = Helenus.entity(entityClass); HelenusEntity entity = Helenus.entity(entityClass);
return new SelectOperation<E>(this, entity, return new SelectOperation<E>(
this,
entity,
(r) -> { (r) -> {
Map<String, Object> map = new ValueProviderMap(r, valueProvider, entity); Map<String, Object> map = new ValueProviderMap(r, valueProvider, entity);
return (E) Helenus.map(entityClass, map); return (E) Helenus.map(entityClass, map);
@ -417,7 +424,9 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
Objects.requireNonNull(entityClass, "entityClass is empty"); Objects.requireNonNull(entityClass, "entityClass is empty");
HelenusEntity entity = Helenus.entity(entityClass); HelenusEntity entity = Helenus.entity(entityClass);
return new SelectOperation<E>(this, entity, return new SelectOperation<E>(
this,
entity,
(r) -> { (r) -> {
Map<String, Object> map = new ValueProviderMap(r, valueProvider, entity); Map<String, Object> map = new ValueProviderMap(r, valueProvider, entity);
return (E) Helenus.map(entityClass, map); return (E) Helenus.map(entityClass, map);
@ -425,7 +434,8 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
} }
public <E> SelectOperation<Row> selectAll(E pojo) { public <E> SelectOperation<Row> selectAll(E pojo) {
Objects.requireNonNull(pojo, "supplied object must be a dsl for a registered entity but cannot be null"); Objects.requireNonNull(
pojo, "supplied object must be a dsl for a registered entity but cannot be null");
HelenusEntity entity = Helenus.resolve(pojo); HelenusEntity entity = Helenus.resolve(pojo);
return new SelectOperation<Row>(this, entity); return new SelectOperation<Row>(this, entity);
} }
@ -440,7 +450,8 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
Objects.requireNonNull(getter1, "field 1 is empty"); Objects.requireNonNull(getter1, "field 1 is empty");
HelenusPropertyNode p1 = MappingUtil.resolveMappingProperty(getter1); HelenusPropertyNode p1 = MappingUtil.resolveMappingProperty(getter1);
return new SelectOperation<Tuple1<V1>>(this, new Mappers.Mapper1<V1>(getValueProvider(), p1), p1); return new SelectOperation<Tuple1<V1>>(
this, new Mappers.Mapper1<V1>(getValueProvider(), p1), p1);
} }
public <V1, V2> SelectOperation<Tuple2<V1, V2>> select(Getter<V1> getter1, Getter<V2> getter2) { public <V1, V2> SelectOperation<Tuple2<V1, V2>> select(Getter<V1> getter1, Getter<V2> getter2) {
@ -449,7 +460,8 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
HelenusPropertyNode p1 = MappingUtil.resolveMappingProperty(getter1); HelenusPropertyNode p1 = MappingUtil.resolveMappingProperty(getter1);
HelenusPropertyNode p2 = MappingUtil.resolveMappingProperty(getter2); HelenusPropertyNode p2 = MappingUtil.resolveMappingProperty(getter2);
return new SelectOperation<Fun.Tuple2<V1, V2>>(this, new Mappers.Mapper2<V1, V2>(getValueProvider(), p1, p2), p1, p2); return new SelectOperation<Fun.Tuple2<V1, V2>>(
this, new Mappers.Mapper2<V1, V2>(getValueProvider(), p1, p2), p1, p2);
} }
public <V1, V2, V3> SelectOperation<Fun.Tuple3<V1, V2, V3>> select( public <V1, V2, V3> SelectOperation<Fun.Tuple3<V1, V2, V3>> select(
@ -723,7 +735,8 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
} }
public <T> InsertOperation<T> upsert(T pojo) { public <T> InsertOperation<T> upsert(T pojo) {
Objects.requireNonNull(pojo, Objects.requireNonNull(
pojo,
"supplied object must be either an instance of the entity class or a dsl for it, but cannot be null"); "supplied object must be either an instance of the entity class or a dsl for it, but cannot be null");
HelenusEntity entity = null; HelenusEntity entity = null;
try { try {

View file

@ -77,14 +77,16 @@ public final class TableOperations {
} }
public void createView(HelenusEntity entity) { public void createView(HelenusEntity entity) {
sessionOps.execute(SchemaUtil.createMaterializedView( sessionOps.execute(
SchemaUtil.createMaterializedView(
sessionOps.usingKeyspace(), entity.getName().toCql(), entity)); sessionOps.usingKeyspace(), entity.getName().toCql(), entity));
// executeBatch(SchemaUtil.createIndexes(entity)); NOTE: Unfortunately C* 3.10 does not yet support 2i on materialized views. // executeBatch(SchemaUtil.createIndexes(entity)); NOTE: Unfortunately C* 3.10 does not yet support 2i on materialized views.
} }
public void dropView(HelenusEntity entity) { public void dropView(HelenusEntity entity) {
sessionOps.execute( sessionOps.execute(
SchemaUtil.dropMaterializedView(sessionOps.usingKeyspace(), entity.getName().toCql(), entity)); SchemaUtil.dropMaterializedView(
sessionOps.usingKeyspace(), entity.getName().toCql(), entity));
} }
public void updateView(TableMetadata tmd, HelenusEntity entity) { public void updateView(TableMetadata tmd, HelenusEntity entity) {

View file

@ -15,12 +15,10 @@
*/ */
package net.helenus.core; package net.helenus.core;
import com.datastax.driver.core.Statement;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import net.helenus.core.cache.Facet; import net.helenus.core.cache.Facet;
import net.helenus.core.operation.AbstractOperation; import net.helenus.core.operation.AbstractOperation;

View file

@ -30,7 +30,9 @@ public class BoundFacet extends Facet<String> {
this.properties.put(property, value); this.properties.put(property, value);
} }
public Set<HelenusProperty> getProperties() { return properties.keySet(); } public Set<HelenusProperty> getProperties() {
return properties.keySet();
}
public BoundFacet(String name, Map<HelenusProperty, Object> properties) { public BoundFacet(String name, Map<HelenusProperty, Object> properties) {
super( super(

View file

@ -1,5 +1,10 @@
package net.helenus.core.cache; package net.helenus.core.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import net.helenus.core.Helenus; import net.helenus.core.Helenus;
import net.helenus.core.reflect.Entity; import net.helenus.core.reflect.Entity;
import net.helenus.core.reflect.MapExportable; import net.helenus.core.reflect.MapExportable;
@ -7,13 +12,6 @@ import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusProperty; import net.helenus.mapping.HelenusProperty;
import net.helenus.mapping.MappingUtil; import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.value.BeanColumnValueProvider; import net.helenus.mapping.value.BeanColumnValueProvider;
import net.helenus.support.HelenusException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class CacheUtil { public class CacheUtil {
@ -28,7 +26,8 @@ public class CacheUtil {
return out; return out;
} }
private static void kCombinations(List<String> items, int n, int k, String[] arr, List<String[]> out) { private static void kCombinations(
List<String> items, int n, int k, String[] arr, List<String[]> out) {
if (k == 0) { if (k == 0) {
out.add(arr.clone()); out.add(arr.clone());
} else { } else {
@ -41,11 +40,12 @@ public class CacheUtil {
public static List<String> flatKeys(String table, List<Facet> facets) { public static List<String> flatKeys(String table, List<Facet> facets) {
return flattenFacets(facets) return flattenFacets(facets)
.stream() .stream()
.map(combination -> { .map(
combination -> {
return table + "." + Arrays.toString(combination); return table + "." + Arrays.toString(combination);
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
public static List<String[]> flattenFacets(List<Facet> facets) { public static List<String[]> flattenFacets(List<Facet> facets) {
@ -61,130 +61,136 @@ public class CacheUtil {
}) })
.collect(Collectors.toList())); .collect(Collectors.toList()));
// TODO(gburd): rework so as to not generate the combinations at all rather than filter // TODO(gburd): rework so as to not generate the combinations at all rather than filter
facets = facets.stream() facets =
facets
.stream()
.filter(f -> !f.fixed()) .filter(f -> !f.fixed())
.filter(f -> !f.alone() || !f.combined()) .filter(f -> !f.alone() || !f.combined())
.collect(Collectors.toList()); .collect(Collectors.toList());
for (Facet facet : facets) { for (Facet facet : facets) {
combinations = combinations combinations =
combinations
.stream() .stream()
.filter(combo -> { .filter(
// When used alone, this facet is not distinct so don't use it as a key. combo -> {
if (combo.length == 1) { // When used alone, this facet is not distinct so don't use it as a key.
if (!facet.alone() && combo[0].startsWith(facet.name() + "==")) { if (combo.length == 1) {
return false; if (!facet.alone() && combo[0].startsWith(facet.name() + "==")) {
}
} else {
if (!facet.combined()) {
for (String c : combo) {
// Don't use this facet in combination with others to create keys.
if (c.startsWith(facet.name() + "==")) {
return false; return false;
} }
} else {
if (!facet.combined()) {
for (String c : combo) {
// Don't use this facet in combination with others to create keys.
if (c.startsWith(facet.name() + "==")) {
return false;
}
}
}
} }
} return true;
} })
return true;
})
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
return combinations; return combinations;
} }
/** /** Merge changed values in the map behind `from` into `to`. */
* Merge changed values in the map behind `from` into `to`.
*/
public static Object merge(Object t, Object f) { public static Object merge(Object t, Object f) {
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(t)); HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(t));
if (t == f) return t; if (t == f) return t;
if (f == null) return t; if (f == null) return t;
if (t == null) return f; if (t == null) return f;
if (t instanceof MapExportable && t instanceof Entity && f instanceof MapExportable && f instanceof Entity) { if (t instanceof MapExportable
Entity to = (Entity) t; && t instanceof Entity
Entity from = (Entity) f; && f instanceof MapExportable
Map<String, Object> toValueMap = ((MapExportable) to).toMap(); && f instanceof Entity) {
Map<String, Object> fromValueMap = ((MapExportable) from).toMap(); Entity to = (Entity) t;
for (HelenusProperty prop : entity.getOrderedProperties()) { Entity from = (Entity) f;
switch (prop.getColumnType()) { Map<String, Object> toValueMap = ((MapExportable) to).toMap();
case PARTITION_KEY: Map<String, Object> fromValueMap = ((MapExportable) from).toMap();
case CLUSTERING_COLUMN: for (HelenusProperty prop : entity.getOrderedProperties()) {
continue; switch (prop.getColumnType()) {
default: case PARTITION_KEY:
Object toVal = BeanColumnValueProvider.INSTANCE.getColumnValue(to, -1, prop, false); case CLUSTERING_COLUMN:
Object fromVal = BeanColumnValueProvider.INSTANCE.getColumnValue(from, -1, prop, false); continue;
String ttlKey = ttlKey(prop); default:
String writeTimeKey = writeTimeKey(prop); Object toVal = BeanColumnValueProvider.INSTANCE.getColumnValue(to, -1, prop, false);
int[] toTtlI = (int[]) toValueMap.get(ttlKey); Object fromVal = BeanColumnValueProvider.INSTANCE.getColumnValue(from, -1, prop, false);
int toTtl = (toTtlI != null) ? toTtlI[0] : 0; String ttlKey = ttlKey(prop);
Long toWriteTime = (Long) toValueMap.get(writeTimeKey); String writeTimeKey = writeTimeKey(prop);
int[] fromTtlI = (int[]) fromValueMap.get(ttlKey); int[] toTtlI = (int[]) toValueMap.get(ttlKey);
int fromTtl = (fromTtlI != null) ? fromTtlI[0] : 0; int toTtl = (toTtlI != null) ? toTtlI[0] : 0;
Long fromWriteTime = (Long) fromValueMap.get(writeTimeKey); Long toWriteTime = (Long) toValueMap.get(writeTimeKey);
int[] fromTtlI = (int[]) fromValueMap.get(ttlKey);
int fromTtl = (fromTtlI != null) ? fromTtlI[0] : 0;
Long fromWriteTime = (Long) fromValueMap.get(writeTimeKey);
if (toVal != null) { if (toVal != null) {
if (fromVal != null) { if (fromVal != null) {
if (toVal == fromVal) { if (toVal == fromVal) {
// Case: object identity // Case: object identity
// Goal: ensure write time and ttl are also in sync // Goal: ensure write time and ttl are also in sync
if (fromWriteTime != null && fromWriteTime != 0L && if (fromWriteTime != null
(toWriteTime == null || fromWriteTime > toWriteTime)) { && fromWriteTime != 0L
((MapExportable) to).put(writeTimeKey, fromWriteTime); && (toWriteTime == null || fromWriteTime > toWriteTime)) {
} ((MapExportable) to).put(writeTimeKey, fromWriteTime);
if (fromTtl > 0 && fromTtl > toTtl) {
((MapExportable) to).put(ttlKey, fromTtl);
}
} else if (fromWriteTime != null && fromWriteTime != 0L) {
// Case: to exists and from exists
// Goal: copy over from -> to iff from.writeTime > to.writeTime
if (toWriteTime != null && toWriteTime != 0L) {
if (fromWriteTime > toWriteTime) {
((MapExportable) to).put(prop.getPropertyName(), fromVal);
((MapExportable) to).put(writeTimeKey, fromWriteTime);
if (fromTtl > 0) {
((MapExportable) to).put(ttlKey, fromTtl);
}
}
} else {
((MapExportable) to).put(prop.getPropertyName(), fromVal);
((MapExportable) to).put(writeTimeKey, fromWriteTime);
if (fromTtl > 0) {
((MapExportable) to).put(ttlKey, fromTtl);
}
}
} else {
if (toWriteTime == null || toWriteTime == 0L) {
// Caution, entering grey area...
if (!toVal.equals(fromVal)) {
// dangerous waters here, values diverge without information that enables resolution,
// policy (for now) is to move value from -> to anyway.
((MapExportable) to).put(prop.getPropertyName(), fromVal);
if (fromTtl > 0) {
((MapExportable) to).put(ttlKey, fromTtl);
}
}
}
}
}
} else {
// Case: from exists, but to doesn't (it's null)
// Goal: copy over from -> to, include ttl and writeTime if present
if (fromVal != null) {
((MapExportable) to).put(prop.getPropertyName(), fromVal);
if (fromWriteTime != null && fromWriteTime != 0L) {
((MapExportable) to).put(writeTimeKey, fromWriteTime);
}
if (fromTtl > 0) {
((MapExportable) to).put(ttlKey, fromTtl);
}
}
} }
if (fromTtl > 0 && fromTtl > toTtl) {
((MapExportable) to).put(ttlKey, fromTtl);
}
} else if (fromWriteTime != null && fromWriteTime != 0L) {
// Case: to exists and from exists
// Goal: copy over from -> to iff from.writeTime > to.writeTime
if (toWriteTime != null && toWriteTime != 0L) {
if (fromWriteTime > toWriteTime) {
((MapExportable) to).put(prop.getPropertyName(), fromVal);
((MapExportable) to).put(writeTimeKey, fromWriteTime);
if (fromTtl > 0) {
((MapExportable) to).put(ttlKey, fromTtl);
}
}
} else {
((MapExportable) to).put(prop.getPropertyName(), fromVal);
((MapExportable) to).put(writeTimeKey, fromWriteTime);
if (fromTtl > 0) {
((MapExportable) to).put(ttlKey, fromTtl);
}
}
} else {
if (toWriteTime == null || toWriteTime == 0L) {
// Caution, entering grey area...
if (!toVal.equals(fromVal)) {
// dangerous waters here, values diverge without information that enables resolution,
// policy (for now) is to move value from -> to anyway.
((MapExportable) to).put(prop.getPropertyName(), fromVal);
if (fromTtl > 0) {
((MapExportable) to).put(ttlKey, fromTtl);
}
}
}
}
} }
} } else {
return to; // Case: from exists, but to doesn't (it's null)
// Goal: copy over from -> to, include ttl and writeTime if present
if (fromVal != null) {
((MapExportable) to).put(prop.getPropertyName(), fromVal);
if (fromWriteTime != null && fromWriteTime != 0L) {
((MapExportable) to).put(writeTimeKey, fromWriteTime);
}
if (fromTtl > 0) {
((MapExportable) to).put(ttlKey, fromTtl);
}
}
}
}
} }
return t; return to;
}
return t;
} }
public static String schemaName(List<Facet> facets) { public static String schemaName(List<Facet> facets) {
@ -196,16 +202,18 @@ public class CacheUtil {
} }
public static String writeTimeKey(HelenusProperty prop) { public static String writeTimeKey(HelenusProperty prop) {
return writeTimeKey(prop.getColumnName().toCql(false)); return writeTimeKey(prop.getColumnName().toCql(false));
} }
public static String ttlKey(HelenusProperty prop) { public static String ttlKey(HelenusProperty prop) {
return ttlKey(prop.getColumnName().toCql(false)); return ttlKey(prop.getColumnName().toCql(false));
} }
public static String writeTimeKey(String columnName) { public static String writeTimeKey(String columnName) {
return "_" + columnName + "_writeTime"; return "_" + columnName + "_writeTime";
} }
public static String ttlKey(String columnName) { return "_" + columnName + "_ttl"; } public static String ttlKey(String columnName) {
return "_" + columnName + "_ttl";
}
} }

View file

@ -58,6 +58,11 @@ public class Facet<T> {
this.combined = combined; this.combined = combined;
} }
public boolean alone() { return alone; } public boolean alone() {
public boolean combined() { return combined; } return alone;
}
public boolean combined() {
return combined;
}
} }

View file

@ -22,7 +22,8 @@ import java.util.Map;
import net.helenus.core.*; import net.helenus.core.*;
import net.helenus.mapping.HelenusProperty; import net.helenus.mapping.HelenusProperty;
public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterStreamOperation<E, O>> public abstract class AbstractFilterStreamOperation<
E, O extends AbstractFilterStreamOperation<E, O>>
extends AbstractStreamOperation<E, O> { extends AbstractStreamOperation<E, O> {
protected Map<HelenusProperty, Filter<?>> filters = null; protected Map<HelenusProperty, Filter<?>> filters = null;
@ -41,8 +42,7 @@ public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterS
public <V> O where(Getter<V> getter, Operator operator, V val) { public <V> O where(Getter<V> getter, Operator operator, V val) {
if (val != null) if (val != null) addFilter(Filter.create(getter, operator, val));
addFilter(Filter.create(getter, operator, val));
return (O) this; return (O) this;
} }
@ -63,8 +63,7 @@ public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterS
public <V> O and(Getter<V> getter, Operator operator, V val) { public <V> O and(Getter<V> getter, Operator operator, V val) {
if (val != null) if (val != null) addFilter(Filter.create(getter, operator, val));
addFilter(Filter.create(getter, operator, val));
return (O) this; return (O) this;
} }
@ -85,8 +84,7 @@ public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterS
public <V> O onlyIf(Getter<V> getter, Operator operator, V val) { public <V> O onlyIf(Getter<V> getter, Operator operator, V val) {
if (val != null) if (val != null) addIfFilter(Filter.create(getter, operator, val));
addIfFilter(Filter.create(getter, operator, val));
return (O) this; return (O) this;
} }

View file

@ -20,8 +20,6 @@ import com.datastax.driver.core.ResultSet;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import com.datastax.driver.core.Statement;
import net.helenus.core.AbstractSessionOperations; import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork; import net.helenus.core.UnitOfWork;
@ -41,8 +39,15 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
public E sync() throws TimeoutException { public E sync() throws TimeoutException {
final Timer.Context context = requestLatency.time(); final Timer.Context context = requestLatency.time();
try { try {
ResultSet resultSet = this.execute(sessionOps,null, traceContext, queryExecutionTimeout, queryTimeoutUnits, ResultSet resultSet =
showValues,false); this.execute(
sessionOps,
null,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
false);
return transform(resultSet); return transform(resultSet);
} finally { } finally {
context.stop(); context.stop();
@ -54,7 +59,11 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
final Timer.Context context = requestLatency.time(); final Timer.Context context = requestLatency.time();
try { try {
ResultSet resultSet = execute(sessionOps, uow, traceContext, ResultSet resultSet =
execute(
sessionOps,
uow,
traceContext,
queryExecutionTimeout, queryExecutionTimeout,
queryTimeoutUnits, queryTimeoutUnits,
showValues, showValues,

View file

@ -70,24 +70,24 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
boolean updateCache = isSessionCacheable() && !ignoreCache(); boolean updateCache = isSessionCacheable() && !ignoreCache();
if (updateCache) { if (updateCache) {
List<Facet> facets = bindFacetValues(); List<Facet> facets = bindFacetValues();
if (facets != null && facets.size() > 0) { if (facets != null && facets.size() > 0) {
if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) { if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) {
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
cacheResult = (E) sessionOps.checkCache(tableName, facets); cacheResult = (E) sessionOps.checkCache(tableName, facets);
if (cacheResult != null) { if (cacheResult != null) {
result = Optional.of(cacheResult); result = Optional.of(cacheResult);
updateCache = false; updateCache = false;
sessionCacheHits.mark(); sessionCacheHits.mark();
cacheHits.mark(); cacheHits.mark();
} else { } else {
sessionCacheMiss.mark(); sessionCacheMiss.mark();
cacheMiss.mark(); cacheMiss.mark();
} }
}
} else {
//TODO(gburd): look in statement cache for results
} }
} else {
//TODO(gburd): look in statement cache for results
}
} }
if (!result.isPresent()) { if (!result.isPresent()) {
@ -108,8 +108,9 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
if (updateCache && result.isPresent()) { if (updateCache && result.isPresent()) {
E r = result.get(); E r = result.get();
Class<?> resultClass = r.getClass(); Class<?> resultClass = r.getClass();
if (!(resultClass.getEnclosingClass() != null && resultClass.getEnclosingClass() == Fun.class)) { if (!(resultClass.getEnclosingClass() != null
&& resultClass.getEnclosingClass() == Fun.class)) {
List<Facet> facets = getFacets(); List<Facet> facets = getFacets();
if (facets != null && facets.size() > 1) { if (facets != null && facets.size() > 1) {
sessionOps.updateCache(r, facets); sessionOps.updateCache(r, facets);
@ -137,57 +138,57 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
try { try {
List<Facet> facets = bindFacetValues(); List<Facet> facets = bindFacetValues();
if (facets != null && facets.size() > 0) { if (facets != null && facets.size() > 0) {
if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) { if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) {
cachedResult = checkCache(uow, facets); cachedResult = checkCache(uow, facets);
if (cachedResult != null) { if (cachedResult != null) {
updateCache = false; updateCache = false;
result = Optional.of(cachedResult); result = Optional.of(cachedResult);
uowCacheHits.mark(); uowCacheHits.mark();
cacheHits.mark(); cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0); uow.recordCacheAndDatabaseOperationCount(1, 0);
} else { } else {
uowCacheMiss.mark(); uowCacheMiss.mark();
if (isSessionCacheable()) { if (isSessionCacheable()) {
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
cachedResult = (E) sessionOps.checkCache(tableName, facets); cachedResult = (E) sessionOps.checkCache(tableName, facets);
Class<?> iface = MappingUtil.getMappingInterface(cachedResult); Class<?> iface = MappingUtil.getMappingInterface(cachedResult);
if (cachedResult != null) { if (cachedResult != null) {
try { try {
if (Drafted.class.isAssignableFrom(iface)) { if (Drafted.class.isAssignableFrom(iface)) {
result = Optional.of(cachedResult); result = Optional.of(cachedResult);
} else { } else {
result = Optional.of(MappingUtil.clone(cachedResult)); result = Optional.of(MappingUtil.clone(cachedResult));
} }
sessionCacheHits.mark(); sessionCacheHits.mark();
cacheHits.mark(); cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0); uow.recordCacheAndDatabaseOperationCount(1, 0);
} catch (CloneNotSupportedException e) { } catch (CloneNotSupportedException e) {
result = Optional.empty(); result = Optional.empty();
sessionCacheMiss.mark(); sessionCacheMiss.mark();
cacheMiss.mark(); cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0); uow.recordCacheAndDatabaseOperationCount(-1, 0);
} finally { } finally {
if (result.isPresent()) { if (result.isPresent()) {
updateCache = true; updateCache = true;
} else { } else {
updateCache = false;
}
}
} else {
updateCache = false;
sessionCacheMiss.mark();
cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0);
}
} else {
updateCache = false; updateCache = false;
}
} }
} else {
updateCache = false;
sessionCacheMiss.mark();
cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0);
}
} else {
updateCache = false;
} }
}
} else { } else {
//TODO(gburd): look in statement cache for results //TODO(gburd): look in statement cache for results
cacheMiss.mark(); cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0); uow.recordCacheAndDatabaseOperationCount(-1, 0);
updateCache = false; //true; updateCache = false; //true;
} }
} else { } else {
updateCache = false; updateCache = false;
@ -210,8 +211,15 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
} else { } else {
// Formulate the query and execute it against the Cassandra cluster. // Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, ResultSet resultSet =
showValues, true); execute(
sessionOps,
uow,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
true);
// Transform the query result set into the desired shape. // Transform the query result set into the desired shape.
result = transform(resultSet); result = transform(resultSet);

View file

@ -328,7 +328,8 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
protected Object cacheUpdate(UnitOfWork<?> uow, E pojo, List<Facet> identifyingFacets) { protected Object cacheUpdate(UnitOfWork<?> uow, E pojo, List<Facet> identifyingFacets) {
List<Facet> facets = new ArrayList<>(); List<Facet> facets = new ArrayList<>();
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; Map<String, Object> valueMap =
pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
for (Facet facet : identifyingFacets) { for (Facet facet : identifyingFacets) {
if (facet instanceof UnboundFacet) { if (facet instanceof UnboundFacet) {

View file

@ -73,21 +73,21 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
if (!ignoreCache() && isSessionCacheable()) { if (!ignoreCache() && isSessionCacheable()) {
List<Facet> facets = bindFacetValues(); List<Facet> facets = bindFacetValues();
if (facets != null && facets.size() > 0) { if (facets != null && facets.size() > 0) {
if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) { if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) {
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
cacheResult = (E) sessionOps.checkCache(tableName, facets); cacheResult = (E) sessionOps.checkCache(tableName, facets);
if (cacheResult != null) { if (cacheResult != null) {
resultStream = Stream.of(cacheResult); resultStream = Stream.of(cacheResult);
updateCache = false; updateCache = false;
sessionCacheHits.mark(); sessionCacheHits.mark();
cacheHits.mark(); cacheHits.mark();
} else {
sessionCacheMiss.mark();
cacheMiss.mark();
}
} else { } else {
//TODO(gburd): look in statement cache for results sessionCacheMiss.mark();
cacheMiss.mark();
} }
} else {
//TODO(gburd): look in statement cache for results
}
} }
} }
@ -114,7 +114,8 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
resultStream.forEach( resultStream.forEach(
result -> { result -> {
Class<?> resultClass = result.getClass(); Class<?> resultClass = result.getClass();
if (!(resultClass.getEnclosingClass() != null && resultClass.getEnclosingClass() == Fun.class)) { if (!(resultClass.getEnclosingClass() != null
&& resultClass.getEnclosingClass() == Fun.class)) {
sessionOps.updateCache(result, facets); sessionOps.updateCache(result, facets);
} }
again.add(result); again.add(result);
@ -144,57 +145,57 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
List<Facet> facets = bindFacetValues(); List<Facet> facets = bindFacetValues();
if (facets != null && facets.size() > 0) { if (facets != null && facets.size() > 0) {
if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) { if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) {
cachedResult = checkCache(uow, facets); cachedResult = checkCache(uow, facets);
if (cachedResult != null) { if (cachedResult != null) {
updateCache = false; updateCache = false;
resultStream = Stream.of(cachedResult); resultStream = Stream.of(cachedResult);
uowCacheHits.mark(); uowCacheHits.mark();
cacheHits.mark(); cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0); uow.recordCacheAndDatabaseOperationCount(1, 0);
} else { } else {
uowCacheMiss.mark(); uowCacheMiss.mark();
if (isSessionCacheable()) { if (isSessionCacheable()) {
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
cachedResult = (E) sessionOps.checkCache(tableName, facets); cachedResult = (E) sessionOps.checkCache(tableName, facets);
Class<?> iface = MappingUtil.getMappingInterface(cachedResult); Class<?> iface = MappingUtil.getMappingInterface(cachedResult);
if (cachedResult != null) { if (cachedResult != null) {
E result = null; E result = null;
try { try {
if (Drafted.class.isAssignableFrom(iface)) { if (Drafted.class.isAssignableFrom(iface)) {
result = cachedResult; result = cachedResult;
} else { } else {
result = MappingUtil.clone(cachedResult); result = MappingUtil.clone(cachedResult);
} }
resultStream = Stream.of(result); resultStream = Stream.of(result);
sessionCacheHits.mark(); sessionCacheHits.mark();
cacheHits.mark(); cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0); uow.recordCacheAndDatabaseOperationCount(1, 0);
} catch (CloneNotSupportedException e) { } catch (CloneNotSupportedException e) {
resultStream = null; resultStream = null;
sessionCacheMiss.mark(); sessionCacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0); uow.recordCacheAndDatabaseOperationCount(-1, 0);
} finally { } finally {
if (result != null) { if (result != null) {
updateCache = true; updateCache = true;
} else { } else {
updateCache = false;
}
}
} else {
updateCache = false;
sessionCacheMiss.mark();
cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0);
}
} else {
updateCache = false; updateCache = false;
}
} }
} else {
updateCache = false;
sessionCacheMiss.mark();
cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0);
}
} else {
updateCache = false;
} }
}
} else { } else {
//TODO(gburd): look in statement cache for results //TODO(gburd): look in statement cache for results
updateCache = false; //true; updateCache = false; //true;
cacheMiss.mark(); cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0); uow.recordCacheAndDatabaseOperationCount(-1, 0);
} }
} else { } else {
updateCache = false; updateCache = false;
@ -209,8 +210,15 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
// Check to see if we fetched the object from the cache // Check to see if we fetched the object from the cache
if (resultStream == null) { if (resultStream == null) {
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, ResultSet resultSet =
showValues, true); execute(
sessionOps,
uow,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
true);
resultStream = transform(resultSet); resultStream = transform(resultSet);
} }
@ -221,15 +229,15 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
List<E> again = new ArrayList<>(); List<E> again = new ArrayList<>();
List<Facet> facets = getFacets(); List<Facet> facets = getFacets();
resultStream.forEach( resultStream.forEach(
result -> { result -> {
Class<?> resultClass = result.getClass(); Class<?> resultClass = result.getClass();
if (result != deleted if (result != deleted
&& !(resultClass.getEnclosingClass() != null && !(resultClass.getEnclosingClass() != null
&& resultClass.getEnclosingClass() == Fun.class)) { && resultClass.getEnclosingClass() == Fun.class)) {
result = (E) cacheUpdate(uow, result, facets); result = (E) cacheUpdate(uow, result, facets);
} }
again.add(result); again.add(result);
}); });
resultStream = again.stream(); resultStream = again.stream();
} }
} }

View file

@ -19,101 +19,122 @@ import com.codahale.metrics.Timer;
import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSet;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.support.HelenusException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.support.HelenusException;
public class BatchOperation extends Operation<Long> { public class BatchOperation extends Operation<Long> {
private BatchStatement batch = null; private BatchStatement batch = null;
private List<AbstractOperation<?, ?>> operations = new ArrayList<AbstractOperation<?, ?>>(); private List<AbstractOperation<?, ?>> operations = new ArrayList<AbstractOperation<?, ?>>();
private boolean logged = true; private boolean logged = true;
private long timestamp = 0L; private long timestamp = 0L;
public BatchOperation(AbstractSessionOperations sessionOperations) { public BatchOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations); super(sessionOperations);
}
public void add(AbstractOperation<?, ?> operation) {
operations.add(operation);
}
@Override
public BatchStatement buildStatement(boolean cached) {
batch = new BatchStatement();
batch.addAll(
operations.stream().map(o -> o.buildStatement(cached)).collect(Collectors.toList()));
batch.setConsistencyLevel(sessionOps.getDefaultConsistencyLevel());
timestamp = System.nanoTime();
batch.setDefaultTimestamp(timestamp);
return batch;
}
public BatchOperation logged() {
logged = true;
return this;
}
public BatchOperation setLogged(boolean logStatements) {
logged = logStatements;
return this;
}
public Long sync() throws TimeoutException {
if (operations.size() == 0) return 0L;
final Timer.Context context = requestLatency.time();
try {
timestamp = System.nanoTime();
batch.setDefaultTimestamp(timestamp);
ResultSet resultSet =
this.execute(
sessionOps,
null,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
false);
if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch.");
}
} finally {
context.stop();
} }
return timestamp;
}
public void add(AbstractOperation<?, ?> operation) { public Long sync(UnitOfWork<?> uow) throws TimeoutException {
operations.add(operation); if (operations.size() == 0) return 0L;
if (uow == null) return sync();
final Timer.Context context = requestLatency.time();
final Stopwatch timer = Stopwatch.createStarted();
try {
uow.recordCacheAndDatabaseOperationCount(0, 1);
ResultSet resultSet =
this.execute(
sessionOps,
uow,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
false);
if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch.");
}
} finally {
context.stop();
timer.stop();
} }
uow.addDatabaseTime("Cassandra", timer);
return timestamp;
}
@Override public void addAll(BatchOperation batch) {
public BatchStatement buildStatement(boolean cached) { batch.operations.forEach(o -> this.operations.add(o));
batch = new BatchStatement(); }
batch.addAll(operations.stream().map(o -> o.buildStatement(cached)).collect(Collectors.toList()));
batch.setConsistencyLevel(sessionOps.getDefaultConsistencyLevel()); public String toString() {
timestamp = System.nanoTime(); return toString(true); //TODO(gburd): sessionOps.showQueryValues()
batch.setDefaultTimestamp(timestamp); }
return batch;
} public String toString(boolean showValues) {
StringBuilder s = new StringBuilder();
public BatchOperation logged() { s.append("BEGIN ");
logged = true; if (!logged) {
return this; s.append("UN");
}
public BatchOperation setLogged(boolean logStatements) {
logged = logStatements;
return this;
}
public Long sync() throws TimeoutException {
if (operations.size() == 0) return 0L;
final Timer.Context context = requestLatency.time();
try {
timestamp = System.nanoTime();
batch.setDefaultTimestamp(timestamp);
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch.");
}
} finally {
context.stop();
}
return timestamp;
}
public Long sync(UnitOfWork<?> uow) throws TimeoutException {
if (operations.size() == 0) return 0L;
if (uow == null)
return sync();
final Timer.Context context = requestLatency.time();
final Stopwatch timer = Stopwatch.createStarted();
try {
uow.recordCacheAndDatabaseOperationCount(0, 1);
ResultSet resultSet = this.execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch.");
}
} finally {
context.stop();
timer.stop();
}
uow.addDatabaseTime("Cassandra", timer);
return timestamp;
}
public void addAll(BatchOperation batch) {
batch.operations.forEach(o -> this.operations.add(o));
}
public String toString() {
return toString(true); //TODO(gburd): sessionOps.showQueryValues()
}
public String toString(boolean showValues) {
StringBuilder s = new StringBuilder();
s.append("BEGIN ");
if (!logged) { s.append("UN"); }
s.append("LOGGED BATCH; ");
s.append(operations.stream().map(o -> Operation.queryString(o.buildStatement(showValues), showValues)).collect(Collectors.joining(" ")));
s.append(" APPLY BATCH;");
return s.toString();
} }
s.append("LOGGED BATCH; ");
s.append(
operations
.stream()
.map(o -> Operation.queryString(o.buildStatement(showValues), showValues))
.collect(Collectors.joining(" ")));
s.append(" APPLY BATCH;");
return s.toString();
}
} }

View file

@ -19,6 +19,10 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement; import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.datastax.driver.core.querybuilder.Insert; import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.QueryBuilder;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.helenus.core.AbstractSessionOperations; import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.Getter; import net.helenus.core.Getter;
import net.helenus.core.Helenus; import net.helenus.core.Helenus;
@ -38,14 +42,10 @@ import net.helenus.support.Fun;
import net.helenus.support.HelenusException; import net.helenus.support.HelenusException;
import net.helenus.support.HelenusMappingException; import net.helenus.support.HelenusMappingException;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
public final class InsertOperation<T> extends AbstractOperation<T, InsertOperation<T>> { public final class InsertOperation<T> extends AbstractOperation<T, InsertOperation<T>> {
private final List<Fun.Tuple2<HelenusPropertyNode, Object>> values = new ArrayList<Fun.Tuple2<HelenusPropertyNode, Object>>(); private final List<Fun.Tuple2<HelenusPropertyNode, Object>> values =
new ArrayList<Fun.Tuple2<HelenusPropertyNode, Object>>();
private final T pojo; private final T pojo;
private final Class<?> resultType; private final Class<?> resultType;
private HelenusEntity entity; private HelenusEntity entity;
@ -63,7 +63,11 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
this.resultType = ResultSet.class; this.resultType = ResultSet.class;
} }
public InsertOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity, Class<?> resultType, boolean ifNotExists) { public InsertOperation(
AbstractSessionOperations sessionOperations,
HelenusEntity entity,
Class<?> resultType,
boolean ifNotExists) {
super(sessionOperations); super(sessionOperations);
this.ifNotExists = ifNotExists; this.ifNotExists = ifNotExists;
@ -72,7 +76,8 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
this.entity = entity; this.entity = entity;
} }
public InsertOperation(AbstractSessionOperations sessionOperations, Class<?> resultType, boolean ifNotExists) { public InsertOperation(
AbstractSessionOperations sessionOperations, Class<?> resultType, boolean ifNotExists) {
super(sessionOperations); super(sessionOperations);
this.ifNotExists = ifNotExists; this.ifNotExists = ifNotExists;
@ -80,8 +85,12 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
this.resultType = resultType; this.resultType = resultType;
} }
public InsertOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity, T pojo, public InsertOperation(
Set<String> mutations, boolean ifNotExists) { AbstractSessionOperations sessionOperations,
HelenusEntity entity,
T pojo,
Set<String> mutations,
boolean ifNotExists) {
super(sessionOperations); super(sessionOperations);
this.entity = entity; this.entity = entity;
@ -144,16 +153,28 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
@Override @Override
public BuiltStatement buildStatement(boolean cached) { public BuiltStatement buildStatement(boolean cached) {
List<HelenusEntity> entities = values.stream().map(t -> t._1.getProperty().getEntity()).distinct().collect(Collectors.toList()); List<HelenusEntity> entities =
values
.stream()
.map(t -> t._1.getProperty().getEntity())
.distinct()
.collect(Collectors.toList());
if (entities.size() != 1) { if (entities.size() != 1) {
throw new HelenusMappingException("you can insert only single entity at a time, found: " throw new HelenusMappingException(
+ entities.stream().map(e -> e.getMappingInterface().toString()).collect(Collectors.joining(", "))); "you can insert only single entity at a time, found: "
+ entities
.stream()
.map(e -> e.getMappingInterface().toString())
.collect(Collectors.joining(", ")));
} }
HelenusEntity entity = entities.get(0); HelenusEntity entity = entities.get(0);
if (this.entity != null) { if (this.entity != null) {
if (this.entity != entity) { if (this.entity != entity) {
throw new HelenusMappingException("you can insert only single entity at a time, found: " + throw new HelenusMappingException(
this.entity.getMappingInterface().toString() + ", " + entity.getMappingInterface().toString()); "you can insert only single entity at a time, found: "
+ this.entity.getMappingInterface().toString()
+ ", "
+ entity.getMappingInterface().toString());
} }
} else { } else {
this.entity = entity; this.entity = entity;
@ -188,52 +209,53 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
return insert; return insert;
} }
private T newInstance(Class<?> iface) { private T newInstance(Class<?> iface) {
if (values.size() > 0) { if (values.size() > 0) {
boolean immutable = iface.isAssignableFrom(Drafted.class); boolean immutable = iface.isAssignableFrom(Drafted.class);
Collection<HelenusProperty> properties = entity.getOrderedProperties(); Collection<HelenusProperty> properties = entity.getOrderedProperties();
Map<String, Object> backingMap = new HashMap<String, Object>(properties.size()); Map<String, Object> backingMap = new HashMap<String, Object>(properties.size());
// First, add all the inserted values into our new map. // First, add all the inserted values into our new map.
values.forEach(t -> backingMap.put(t._1.getProperty().getPropertyName(), t._2)); values.forEach(t -> backingMap.put(t._1.getProperty().getPropertyName(), t._2));
// Then, fill in all the rest of the properties. // Then, fill in all the rest of the properties.
for (HelenusProperty prop : properties) { for (HelenusProperty prop : properties) {
String key = prop.getPropertyName(); String key = prop.getPropertyName();
if (backingMap.containsKey(key)) { if (backingMap.containsKey(key)) {
// Some values man need to be converted (e.g. from String to Enum). This is done // Some values man need to be converted (e.g. from String to Enum). This is done
// within the BeanColumnValueProvider below. // within the BeanColumnValueProvider below.
Optional<Function<Object, Object>> converter = prop.getReadConverter( Optional<Function<Object, Object>> converter =
sessionOps.getSessionRepository()); prop.getReadConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) { if (converter.isPresent()) {
backingMap.put(key, converter.get().apply(backingMap.get(key))); backingMap.put(key, converter.get().apply(backingMap.get(key)));
} }
} else { } else {
// If we started this operation with an instance of this type, use values from // If we started this operation with an instance of this type, use values from
// that. // that.
if (pojo != null) { if (pojo != null) {
backingMap.put(key, BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, immutable)); backingMap.put(
} else { key, BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, immutable));
// Otherwise we'll use default values for the property type if available. } else {
Class<?> propType = prop.getJavaType(); // Otherwise we'll use default values for the property type if available.
if (propType.isPrimitive()) { Class<?> propType = prop.getJavaType();
DefaultPrimitiveTypes type = DefaultPrimitiveTypes.lookup(propType); if (propType.isPrimitive()) {
if (type == null) { DefaultPrimitiveTypes type = DefaultPrimitiveTypes.lookup(propType);
throw new HelenusException("unknown primitive type " + propType); if (type == null) {
} throw new HelenusException("unknown primitive type " + propType);
backingMap.put(key, type.getDefaultValue()); }
} backingMap.put(key, type.getDefaultValue());
}
}
} }
}
// Lastly, create a new proxy object for the entity and return the new instance.
return (T) Helenus.map(iface, backingMap);
} }
return null; }
}
@Override // Lastly, create a new proxy object for the entity and return the new instance.
return (T) Helenus.map(iface, backingMap);
}
return null;
}
@Override
public T transform(ResultSet resultSet) { public T transform(ResultSet resultSet) {
if ((ifNotExists == true) && (resultSet.wasApplied() == false)) { if ((ifNotExists == true) && (resultSet.wasApplied() == false)) {
throw new HelenusException("Statement was not applied due to consistency constraints"); throw new HelenusException("Statement was not applied due to consistency constraints");
@ -241,12 +263,12 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
Class<?> iface = entity.getMappingInterface(); Class<?> iface = entity.getMappingInterface();
if (resultType == iface) { if (resultType == iface) {
T o = newInstance(iface); T o = newInstance(iface);
if (o == null) { if (o == null) {
// Oddly, this insert didn't change anything so simply return the pojo. // Oddly, this insert didn't change anything so simply return the pojo.
return (T) pojo; return (T) pojo;
} }
return o; return o;
} }
return (T) resultSet; return (T) resultSet;
} }
@ -265,17 +287,20 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
protected void adjustTtlAndWriteTime(MapExportable pojo) { protected void adjustTtlAndWriteTime(MapExportable pojo) {
if (ttl != null || writeTime != 0L) { if (ttl != null || writeTime != 0L) {
List<String> columnNames = values.stream() List<String> columnNames =
values
.stream()
.map(t -> t._1.getProperty()) .map(t -> t._1.getProperty())
.filter(prop -> { .filter(
switch (prop.getColumnType()) { prop -> {
case PARTITION_KEY: switch (prop.getColumnType()) {
case CLUSTERING_COLUMN: case PARTITION_KEY:
return false; case CLUSTERING_COLUMN:
default: return false;
return true; default:
} return true;
}) }
})
.map(prop -> prop.getColumnName().toCql(false)) .map(prop -> prop.getColumnName().toCql(false))
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -294,7 +319,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
public T sync() throws TimeoutException { public T sync() throws TimeoutException {
T result = super.sync(); T result = super.sync();
if (entity.isCacheable() && result != null) { if (entity.isCacheable() && result != null) {
adjustTtlAndWriteTime((MapExportable)result); adjustTtlAndWriteTime((MapExportable) result);
sessionOps.updateCache(result, bindFacetValues()); sessionOps.updateCache(result, bindFacetValues());
} }
return result; return result;
@ -339,8 +364,8 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
if (resultType == iface) { if (resultType == iface) {
final T result = (pojo == null) ? newInstance(iface) : pojo; final T result = (pojo == null) ? newInstance(iface) : pojo;
if (result != null) { if (result != null) {
adjustTtlAndWriteTime((MapExportable) result); adjustTtlAndWriteTime((MapExportable) result);
cacheUpdate(uow, result, bindFacetValues()); cacheUpdate(uow, result, bindFacetValues());
} }
uow.batch(this); uow.batch(this);
return (T) result; return (T) result;

View file

@ -70,7 +70,7 @@ public abstract class Operation<E> {
} }
public static String queryString(BatchOperation operation, boolean includeValues) { public static String queryString(BatchOperation operation, boolean includeValues) {
return operation.toString(includeValues); return operation.toString(includeValues);
} }
public static String queryString(Statement statement, boolean includeValues) { public static String queryString(Statement statement, boolean includeValues) {
@ -92,8 +92,15 @@ public abstract class Operation<E> {
return query; return query;
} }
public ResultSet execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, public ResultSet execute(
long timeout, TimeUnit units, boolean showValues, boolean cached) throws TimeoutException { AbstractSessionOperations session,
UnitOfWork uow,
TraceContext traceContext,
long timeout,
TimeUnit units,
boolean showValues,
boolean cached)
throws TimeoutException {
// Start recording in a Zipkin sub-span our execution time to perform this operation. // Start recording in a Zipkin sub-span our execution time to perform this operation.
Tracer tracer = session.getZipkinTracer(); Tracer tracer = session.getZipkinTracer();
@ -111,11 +118,17 @@ public abstract class Operation<E> {
Statement statement = options(buildStatement(cached)); Statement statement = options(buildStatement(cached));
if (session.isShowCql() ) { if (session.isShowCql()) {
String stmt = (this instanceof BatchOperation) ? queryString((BatchOperation)this, showValues) : queryString(statement, showValues); String stmt =
(this instanceof BatchOperation)
? queryString((BatchOperation) this, showValues)
: queryString(statement, showValues);
session.getPrintStream().println(stmt); session.getPrintStream().println(stmt);
} else if (LOG.isDebugEnabled()) { } else if (LOG.isDebugEnabled()) {
String stmt = (this instanceof BatchOperation) ? queryString((BatchOperation)this, showValues) : queryString(statement, showValues); String stmt =
(this instanceof BatchOperation)
? queryString((BatchOperation) this, showValues)
: queryString(statement, showValues);
LOG.info("CQL> " + stmt); LOG.info("CQL> " + stmt);
} }
@ -135,7 +148,9 @@ public abstract class Operation<E> {
.map(InetAddress::toString) .map(InetAddress::toString)
.collect(Collectors.joining(", ")); .collect(Collectors.joining(", "));
ConsistencyLevel cl = ei.getAchievedConsistencyLevel(); ConsistencyLevel cl = ei.getAchievedConsistencyLevel();
if (cl == null) { cl = statement.getConsistencyLevel(); } if (cl == null) {
cl = statement.getConsistencyLevel();
}
int se = ei.getSpeculativeExecutions(); int se = ei.getSpeculativeExecutions();
String warn = ei.getWarnings().stream().collect(Collectors.joining(", ")); String warn = ei.getWarnings().stream().collect(Collectors.joining(", "));
String ri = String ri =
@ -148,7 +163,8 @@ public abstract class Operation<E> {
qh.getRack(), qh.getRack(),
(cl != null) (cl != null)
? (" consistency: " ? (" consistency: "
+ cl.name() + " " + cl.name()
+ " "
+ (cl.isDCLocal() ? " DC " : "") + (cl.isDCLocal() ? " DC " : "")
+ (cl.isSerial() ? " SC " : "")) + (cl.isSerial() ? " SC " : ""))
: "", : "",
@ -188,7 +204,8 @@ public abstract class Operation<E> {
timerString = String.format(" %s ", timer.toString()); timerString = String.format(" %s ", timer.toString());
} }
LOG.info( LOG.info(
String.format("%s%s%s", uowString, timerString, Operation.queryString(statement, showValues))); String.format(
"%s%s%s", uowString, timerString, Operation.queryString(statement, showValues)));
} }
} }

View file

@ -65,5 +65,7 @@ public final class SelectFirstOperation<E>
} }
@Override @Override
public boolean ignoreCache() { return delegate.ignoreCache(); } public boolean ignoreCache() {
return delegate.ignoreCache();
}
} }

View file

@ -58,5 +58,7 @@ public final class SelectFirstTransformingOperation<R, E>
} }
@Override @Override
public boolean ignoreCache() { return delegate.ignoreCache(); } public boolean ignoreCache() {
return delegate.ignoreCache();
}
} }

View file

@ -23,7 +23,6 @@ import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select; import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.core.querybuilder.Select.Selection; import com.datastax.driver.core.querybuilder.Select.Selection;
import com.datastax.driver.core.querybuilder.Select.Where; import com.datastax.driver.core.querybuilder.Select.Where;
import com.google.common.collect.Iterables;
import java.util.*; import java.util.*;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -97,7 +96,10 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
this.implementsEntityType = Entity.class.isAssignableFrom(entity.getMappingInterface()); this.implementsEntityType = Entity.class.isAssignableFrom(entity.getMappingInterface());
} }
public SelectOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity, Function<Row, E> rowMapper) { public SelectOperation(
AbstractSessionOperations sessionOperations,
HelenusEntity entity,
Function<Row, E> rowMapper) {
super(sessionOperations); super(sessionOperations);
this.rowMapper = rowMapper; this.rowMapper = rowMapper;
@ -112,8 +114,10 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
this.implementsEntityType = Entity.class.isAssignableFrom(entity.getMappingInterface()); this.implementsEntityType = Entity.class.isAssignableFrom(entity.getMappingInterface());
} }
public SelectOperation(AbstractSessionOperations sessionOperations, Function<Row, E> rowMapper, public SelectOperation(
HelenusPropertyNode... props) { AbstractSessionOperations sessionOperations,
Function<Row, E> rowMapper,
HelenusPropertyNode... props) {
super(sessionOperations); super(sessionOperations);
@ -310,7 +314,9 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
for (Filter<?> filter : filters.values()) { for (Filter<?> filter : filters.values()) {
where.and(filter.getClause(sessionOps.getValuePreparer())); where.and(filter.getClause(sessionOps.getValuePreparer()));
HelenusProperty filterProp = filter.getNode().getProperty(); HelenusProperty filterProp = filter.getNode().getProperty();
HelenusProperty prop = props.stream() HelenusProperty prop =
props
.stream()
.map(HelenusPropertyNode::getProperty) .map(HelenusPropertyNode::getProperty)
.filter(thisProp -> thisProp.getPropertyName().equals(filterProp.getPropertyName())) .filter(thisProp -> thisProp.getPropertyName().equals(filterProp.getPropertyName()))
.findFirst() .findFirst()

View file

@ -58,8 +58,12 @@ public final class SelectTransformingOperation<R, E>
} }
@Override @Override
public boolean isSessionCacheable() { return delegate.isSessionCacheable(); } public boolean isSessionCacheable() {
return delegate.isSessionCacheable();
}
@Override @Override
public boolean ignoreCache() { return delegate.ignoreCache(); } public boolean ignoreCache() {
return delegate.ignoreCache();
}
} }

View file

@ -34,13 +34,10 @@ import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusProperty; import net.helenus.mapping.HelenusProperty;
import net.helenus.mapping.MappingUtil; import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.value.BeanColumnValueProvider; import net.helenus.mapping.value.BeanColumnValueProvider;
import net.helenus.mapping.value.ValueProviderMap;
import net.helenus.support.HelenusException; import net.helenus.support.HelenusException;
import net.helenus.support.HelenusMappingException; import net.helenus.support.HelenusMappingException;
import net.helenus.support.Immutables; import net.helenus.support.Immutables;
import static net.helenus.mapping.ColumnType.CLUSTERING_COLUMN;
import static net.helenus.mapping.ColumnType.PARTITION_KEY;
public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateOperation<E>> { public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateOperation<E>> {
@ -110,7 +107,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
if (pojo != null) { if (pojo != null) {
if (!BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop).equals(v)) { if (!BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop).equals(v)) {
String key = prop.getPropertyName(); String key = prop.getPropertyName();
((MapExportable)pojo).put(key, v); ((MapExportable) pojo).put(key, v);
} }
} }
@ -421,7 +418,8 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
Object valueObj = value; Object valueObj = value;
Optional<Function<Object, Object>> converter = prop.getWriteConverter(sessionOps.getSessionRepository()); Optional<Function<Object, Object>> converter =
prop.getWriteConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) { if (converter.isPresent()) {
List convertedList = (List) converter.get().apply(Immutables.listOf(value)); List convertedList = (List) converter.get().apply(Immutables.listOf(value));
valueObj = convertedList.get(0); valueObj = convertedList.get(0);
@ -436,7 +434,8 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
List valueObj = value; List valueObj = value;
Optional<Function<Object, Object>> converter = prop.getWriteConverter(sessionOps.getSessionRepository()); Optional<Function<Object, Object>> converter =
prop.getWriteConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) { if (converter.isPresent()) {
valueObj = (List) converter.get().apply(value); valueObj = (List) converter.get().apply(value);
} }
@ -581,7 +580,8 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
HelenusProperty prop = p.getProperty(); HelenusProperty prop = p.getProperty();
Object valueObj = value; Object valueObj = value;
Optional<Function<Object, Object>> converter = prop.getWriteConverter(sessionOps.getSessionRepository()); Optional<Function<Object, Object>> converter =
prop.getWriteConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) { if (converter.isPresent()) {
Set convertedSet = (Set) converter.get().apply(Immutables.setOf(value)); Set convertedSet = (Set) converter.get().apply(Immutables.setOf(value));
valueObj = convertedSet.iterator().next(); valueObj = convertedSet.iterator().next();
@ -595,7 +595,8 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
HelenusProperty prop = p.getProperty(); HelenusProperty prop = p.getProperty();
Set valueObj = value; Set valueObj = value;
Optional<Function<Object, Object>> converter = prop.getWriteConverter(sessionOps.getSessionRepository()); Optional<Function<Object, Object>> converter =
prop.getWriteConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) { if (converter.isPresent()) {
valueObj = (Set) converter.get().apply(value); valueObj = (Set) converter.get().apply(value);
} }
@ -634,9 +635,11 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
facet = null; facet = null;
} }
Optional<Function<Object, Object>> converter = prop.getWriteConverter(sessionOps.getSessionRepository()); Optional<Function<Object, Object>> converter =
prop.getWriteConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) { if (converter.isPresent()) {
Map<Object, Object> convertedMap = (Map<Object, Object>) converter.get().apply(Immutables.mapOf(key, value)); Map<Object, Object> convertedMap =
(Map<Object, Object>) converter.get().apply(Immutables.mapOf(key, value));
for (Map.Entry<Object, Object> e : convertedMap.entrySet()) { for (Map.Entry<Object, Object> e : convertedMap.entrySet()) {
assignments.put(QueryBuilder.put(p.getColumnName(), e.getKey(), e.getValue()), facet); assignments.put(QueryBuilder.put(p.getColumnName(), e.getKey(), e.getValue()), facet);
} }
@ -672,7 +675,8 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
facet = null; facet = null;
} }
Optional<Function<Object, Object>> converter = prop.getWriteConverter(sessionOps.getSessionRepository()); Optional<Function<Object, Object>> converter =
prop.getWriteConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) { if (converter.isPresent()) {
Map convertedMap = (Map) converter.get().apply(map); Map convertedMap = (Map) converter.get().apply(map);
assignments.put(QueryBuilder.putAll(p.getColumnName(), convertedMap), facet); assignments.put(QueryBuilder.putAll(p.getColumnName(), convertedMap), facet);
@ -789,7 +793,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
adjustTtlAndWriteTime(draft); adjustTtlAndWriteTime(draft);
} else if (pojo != null) { } else if (pojo != null) {
sessionOps.updateCache(pojo, bindFacetValues()); sessionOps.updateCache(pojo, bindFacetValues());
adjustTtlAndWriteTime((MapExportable)pojo); adjustTtlAndWriteTime((MapExportable) pojo);
} else { } else {
sessionOps.cacheEvict(bindFacetValues()); sessionOps.cacheEvict(bindFacetValues());
} }
@ -811,7 +815,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
cacheUpdate(uow, result, bindFacetValues()); cacheUpdate(uow, result, bindFacetValues());
} else if (pojo != null) { } else if (pojo != null) {
cacheUpdate(uow, (E) pojo, bindFacetValues()); cacheUpdate(uow, (E) pojo, bindFacetValues());
adjustTtlAndWriteTime((MapExportable)pojo); adjustTtlAndWriteTime((MapExportable) pojo);
return (E) pojo; return (E) pojo;
} }
return result; return result;
@ -828,7 +832,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
adjustTtlAndWriteTime(draft); adjustTtlAndWriteTime(draft);
} else if (pojo != null) { } else if (pojo != null) {
result = (E) pojo; result = (E) pojo;
adjustTtlAndWriteTime((MapExportable)pojo); adjustTtlAndWriteTime((MapExportable) pojo);
} else { } else {
result = null; result = null;
} }

View file

@ -18,12 +18,22 @@ package net.helenus.core.reflect;
import net.helenus.core.Getter; import net.helenus.core.Getter;
public interface Entity { public interface Entity {
String WRITTEN_AT_METHOD = "writtenAt"; String WRITTEN_AT_METHOD = "writtenAt";
String TTL_OF_METHOD = "ttlOf"; String TTL_OF_METHOD = "ttlOf";
default Long writtenAt(Getter getter) { return 0L; } default Long writtenAt(Getter getter) {
default Long writtenAt(String prop) { return 0L; }; return 0L;
}
default Integer ttlOf(Getter getter) { return 0; }; default Long writtenAt(String prop) {
default Integer ttlOf(String prop) {return 0; }; return 0L;
};
default Integer ttlOf(Getter getter) {
return 0;
};
default Integer ttlOf(String prop) {
return 0;
};
} }

View file

@ -15,17 +15,20 @@
*/ */
package net.helenus.core.reflect; package net.helenus.core.reflect;
import net.helenus.core.Getter;
import java.util.Map; import java.util.Map;
import net.helenus.core.Getter;
public interface MapExportable { public interface MapExportable {
String TO_MAP_METHOD = "toMap"; String TO_MAP_METHOD = "toMap";
String PUT_METHOD = "put"; String PUT_METHOD = "put";
Map<String, Object> toMap(); Map<String, Object> toMap();
default Map<String, Object> toMap(boolean mutable) { return null; }
default void put(String key, Object value) { }
default <T> void put(Getter<T> getter, T value) { }
default Map<String, Object> toMap(boolean mutable) {
return null;
}
default void put(String key, Object value) {}
default <T> void put(Getter<T> getter, T value) {}
} }

View file

@ -15,14 +15,6 @@
*/ */
package net.helenus.core.reflect; package net.helenus.core.reflect;
import net.helenus.core.Getter;
import net.helenus.core.Helenus;
import net.helenus.core.cache.CacheUtil;
import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.annotation.Transient;
import net.helenus.mapping.value.ValueProviderMap;
import net.helenus.support.HelenusException;
import java.io.InvalidObjectException; import java.io.InvalidObjectException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.io.ObjectStreamException; import java.io.ObjectStreamException;
@ -36,6 +28,13 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import net.helenus.core.Getter;
import net.helenus.core.Helenus;
import net.helenus.core.cache.CacheUtil;
import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.annotation.Transient;
import net.helenus.mapping.value.ValueProviderMap;
import net.helenus.support.HelenusException;
public class MapperInvocationHandler<E> implements InvocationHandler, Serializable { public class MapperInvocationHandler<E> implements InvocationHandler, Serializable {
private static final long serialVersionUID = -7044209982830584984L; private static final long serialVersionUID = -7044209982830584984L;
@ -101,7 +100,7 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
} }
} }
if (otherObj instanceof MapExportable) { if (otherObj instanceof MapExportable) {
return MappingUtil.compareMaps((MapExportable)otherObj, src); return MappingUtil.compareMaps((MapExportable) otherObj, src);
} }
return false; return false;
} }
@ -111,7 +110,7 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
if (args[0] instanceof String) { if (args[0] instanceof String) {
key = (String) args[0]; key = (String) args[0];
} else if (args[0] instanceof Getter) { } else if (args[0] instanceof Getter) {
key = MappingUtil.resolveMappingProperty((Getter)args[0]).getProperty().getPropertyName(); key = MappingUtil.resolveMappingProperty((Getter) args[0]).getProperty().getPropertyName();
} else { } else {
key = null; key = null;
} }
@ -128,14 +127,19 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
if (Entity.WRITTEN_AT_METHOD.equals(methodName) && method.getParameterCount() == 1) { if (Entity.WRITTEN_AT_METHOD.equals(methodName) && method.getParameterCount() == 1) {
final String key; final String key;
if (args[0] instanceof String) { if (args[0] instanceof String) {
key = CacheUtil.writeTimeKey((String)args[0]); key = CacheUtil.writeTimeKey((String) args[0]);
} else if (args[0] instanceof Getter) { } else if (args[0] instanceof Getter) {
Getter getter = (Getter)args[0]; Getter getter = (Getter) args[0];
key = CacheUtil.writeTimeKey(MappingUtil.resolveMappingProperty(getter).getProperty().getColumnName().toCql(false)); key =
CacheUtil.writeTimeKey(
MappingUtil.resolveMappingProperty(getter)
.getProperty()
.getColumnName()
.toCql(false));
} else { } else {
return 0L; return 0L;
} }
Long v = (Long)src.get(key); Long v = (Long) src.get(key);
if (v != null) { if (v != null) {
return v; return v;
} }
@ -145,14 +149,19 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
if (Entity.TTL_OF_METHOD.equals(methodName) && method.getParameterCount() == 1) { if (Entity.TTL_OF_METHOD.equals(methodName) && method.getParameterCount() == 1) {
final String key; final String key;
if (args[0] instanceof String) { if (args[0] instanceof String) {
key = CacheUtil.ttlKey((String)args[0]); key = CacheUtil.ttlKey((String) args[0]);
} else if (args[0] instanceof Getter) { } else if (args[0] instanceof Getter) {
Getter getter = (Getter)args[0]; Getter getter = (Getter) args[0];
key = CacheUtil.ttlKey(MappingUtil.resolveMappingProperty(getter).getProperty().getColumnName().toCql(false)); key =
CacheUtil.ttlKey(
MappingUtil.resolveMappingProperty(getter)
.getProperty()
.getColumnName()
.toCql(false));
} else { } else {
return 0; return 0;
} }
int v[] = (int[])src.get(key); int v[] = (int[]) src.get(key);
if (v != null) { if (v != null) {
return v[0]; return v[0];
} }
@ -185,7 +194,9 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
if (MapExportable.TO_MAP_METHOD.equals(methodName)) { if (MapExportable.TO_MAP_METHOD.equals(methodName)) {
if (method.getParameterCount() == 1 && args[0] instanceof Boolean) { if (method.getParameterCount() == 1 && args[0] instanceof Boolean) {
if ((boolean)args[0] == true) { return src; } if ((boolean) args[0] == true) {
return src;
}
} }
return Collections.unmodifiableMap(src); return Collections.unmodifiableMap(src);
} }

View file

@ -117,7 +117,8 @@ public final class HelenusMappingEntity implements HelenusEntity {
if (iface.getDeclaredAnnotation(MaterializedView.class) == null) { if (iface.getDeclaredAnnotation(MaterializedView.class) == null) {
facetsBuilder.add(new Facet("table", name.toCql()).setFixed()); facetsBuilder.add(new Facet("table", name.toCql()).setFixed());
} else { } else {
facetsBuilder.add(new Facet("table", Helenus.entity(iface.getInterfaces()[0]).getName().toCql()) facetsBuilder.add(
new Facet("table", Helenus.entity(iface.getInterfaces()[0]).getName().toCql())
.setFixed()); .setFixed());
} }
for (HelenusProperty prop : orderedProps) { for (HelenusProperty prop : orderedProps) {
@ -131,7 +132,8 @@ public final class HelenusMappingEntity implements HelenusEntity {
facetsBuilder.add(new UnboundFacet(primaryKeyProperties)); facetsBuilder.add(new UnboundFacet(primaryKeyProperties));
primaryKeyProperties = null; primaryKeyProperties = null;
} }
for (ConstraintValidator<?, ?> constraint : MappingUtil.getValidators(prop.getGetterMethod())) { for (ConstraintValidator<?, ?> constraint :
MappingUtil.getValidators(prop.getGetterMethod())) {
if (constraint.getClass().isAssignableFrom(DistinctValidator.class)) { if (constraint.getClass().isAssignableFrom(DistinctValidator.class)) {
DistinctValidator validator = (DistinctValidator) constraint; DistinctValidator validator = (DistinctValidator) constraint;
String[] values = validator.constraintAnnotation.value(); String[] values = validator.constraintAnnotation.value();

View file

@ -129,9 +129,13 @@ public final class MappingUtil {
} }
public static HelenusProperty getPropertyForColumn(HelenusEntity entity, String name) { public static HelenusProperty getPropertyForColumn(HelenusEntity entity, String name) {
if (name == null) if (name == null) return null;
return null; return entity
return entity.getOrderedProperties().stream().filter(p -> p.getColumnName().equals(name)).findFirst().orElse(null); .getOrderedProperties()
.stream()
.filter(p -> p.getColumnName().equals(name))
.findFirst()
.orElse(null);
} }
public static String getDefaultColumnName(Method getter) { public static String getDefaultColumnName(Method getter) {
@ -331,22 +335,24 @@ public final class MappingUtil {
public static boolean compareMaps(MapExportable me, Map<String, Object> m2) { public static boolean compareMaps(MapExportable me, Map<String, Object> m2) {
Map<String, Object> m1 = me.toMap(); Map<String, Object> m1 = me.toMap();
List<String> matching = m2.entrySet() List<String> matching =
m2.entrySet()
.stream() .stream()
.filter(e -> !e.getKey().matches("^_.*_(ttl|writeTime)$")) .filter(e -> !e.getKey().matches("^_.*_(ttl|writeTime)$"))
.filter(e -> { .filter(
String k = e.getKey(); e -> {
if (m1.containsKey(k)) { String k = e.getKey();
Object o1 = e.getValue(); if (m1.containsKey(k)) {
Object o2 = m1.get(k); Object o1 = e.getValue();
if (o1 == o2 || o1.equals(o2)) Object o2 = m1.get(k);
return true; if (o1 == o2 || o1.equals(o2)) return true;
} }
return false; return false;
}) })
.map(e -> e.getKey()) .map(e -> e.getKey())
.collect(Collectors.toList()); .collect(Collectors.toList());
List<String> divergent = m1.entrySet() List<String> divergent =
m1.entrySet()
.stream() .stream()
.filter(e -> !e.getKey().matches("^_.*_(ttl|writeTime)$")) .filter(e -> !e.getKey().matches("^_.*_(ttl|writeTime)$"))
.filter(e -> !matching.contains(e.getKey())) .filter(e -> !matching.contains(e.getKey()))
@ -354,5 +360,4 @@ public final class MappingUtil {
.collect(Collectors.toList()); .collect(Collectors.toList());
return divergent.size() > 0 ? false : true; return divergent.size() > 0 ? false : true;
} }
} }

View file

@ -238,7 +238,6 @@ public final class Constraints {
boolean alone() default true; boolean alone() default true;
boolean combined() default true; boolean combined() default true;
} }
/** /**

View file

@ -48,5 +48,4 @@ public final class DistinctValidator
public boolean combined() { public boolean combined() {
return annotation == null ? true : annotation.combined(); return annotation == null ? true : annotation.combined();
} }
} }

View file

@ -25,7 +25,8 @@ public enum BeanColumnValueProvider implements ColumnValueProvider {
INSTANCE; INSTANCE;
@Override @Override
public <V> V getColumnValue(Object bean, int columnIndexUnused, HelenusProperty property, boolean immutable) { public <V> V getColumnValue(
Object bean, int columnIndexUnused, HelenusProperty property, boolean immutable) {
Method getter = property.getGetterMethod(); Method getter = property.getGetterMethod();

View file

@ -40,7 +40,8 @@ public final class RowColumnValueProvider implements ColumnValueProvider {
} }
@Override @Override
public <V> V getColumnValue(Object sourceObj, int columnIndex, HelenusProperty property, boolean immutable) { public <V> V getColumnValue(
Object sourceObj, int columnIndex, HelenusProperty property, boolean immutable) {
Row source = (Row) sourceObj; Row source = (Row) sourceObj;

View file

@ -16,7 +16,6 @@
package net.helenus.mapping.value; package net.helenus.mapping.value;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -154,8 +153,9 @@ public final class ValueProviderMap implements Map<String, Object> {
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (o == null || !(o.getClass().isAssignableFrom(Map.class) || o.getClass().getSimpleName().equals("UnmodifiableMap"))) if (o == null
return false; || !(o.getClass().isAssignableFrom(Map.class)
|| o.getClass().getSimpleName().equals("UnmodifiableMap"))) return false;
Map that = (Map) o; Map that = (Map) o;
if (this.size() != that.size()) return false; if (this.size() != that.size()) return false;

View file

@ -79,20 +79,22 @@ public class EntityDraftBuilderTest extends AbstractEmbeddedCassandraTest {
@Test @Test
public void testFoo() throws Exception { public void testFoo() throws Exception {
Supply s1 = session Supply s1 =
.<Supply>select(Supply.class) session
.where(supply::id, eq(id)) .<Supply>select(Supply.class)
.and(supply::region, eq(region)) .where(supply::id, eq(id))
.single() .and(supply::region, eq(region))
.sync() .single()
.orElse(null); .sync()
.orElse(null);
// List // List
Supply s2 = session Supply s2 =
.<Supply>update(s1.update()) session
.and(supply::region, eq(region)) .<Supply>update(s1.update())
.prepend(supply::suppliers, "Pignose Supply, LLC.") .and(supply::region, eq(region))
.sync(); .prepend(supply::suppliers, "Pignose Supply, LLC.")
.sync();
Assert.assertEquals(s2.suppliers().get(0), "Pignose Supply, LLC."); Assert.assertEquals(s2.suppliers().get(0), "Pignose Supply, LLC.");
@ -108,54 +110,55 @@ public class EntityDraftBuilderTest extends AbstractEmbeddedCassandraTest {
@Test @Test
public void testDraftMergeInNestedUow() throws Exception { public void testDraftMergeInNestedUow() throws Exception {
Supply s1, s2, s3, s4, s5; Supply s1, s2, s3, s4, s5;
Supply.Draft d1; Supply.Draft d1;
s1 = session s1 =
session
.<Supply>select(Supply.class)
.where(supply::id, eq(id))
.and(supply::region, eq(region))
.single()
.sync()
.orElse(null);
try (UnitOfWork uow1 = session.begin()) {
s2 =
session
.<Supply>select(Supply.class) .<Supply>select(Supply.class)
.where(supply::id, eq(id)) .where(supply::id, eq(id))
.and(supply::region, eq(region)) .and(supply::region, eq(region))
.single() .single()
.sync() .sync(uow1)
.orElse(null); .orElse(null);
try(UnitOfWork uow1 = session.begin()) { try (UnitOfWork uow2 = session.begin(uow1)) {
s2 = session s3 =
.<Supply>select(Supply.class) session
.where(supply::id, eq(id)) .<Supply>select(Supply.class)
.and(supply::region, eq(region)) .where(supply::id, eq(id))
.single() .and(supply::region, eq(region))
.sync(uow1) .single()
.orElse(null); .sync(uow2)
.orElse(null);
try(UnitOfWork uow2 = session.begin(uow1)) { d1 = s3.update().setCode("WIDGET-002-UPDATED");
s3 = session
.<Supply>select(Supply.class)
.where(supply::id, eq(id))
.and(supply::region, eq(region))
.single()
.sync(uow2)
.orElse(null);
d1 = s3.update() s4 =
.setCode("WIDGET-002-UPDATED"); session.update(d1).usingTtl(20).defaultTimestamp(System.currentTimeMillis()).sync(uow2);
s4 = session.update(d1) uow2.commit();
.usingTtl(20)
.defaultTimestamp(System.currentTimeMillis())
.sync(uow2);
uow2.commit();
}
s5 = session
.<Supply>select(Supply.class)
.where(supply::id, eq(id))
.and(supply::region, eq(region))
.single()
.sync(uow1)
.orElse(null);
} }
s5 =
session
.<Supply>select(Supply.class)
.where(supply::id, eq(id))
.and(supply::region, eq(region))
.single()
.sync(uow1)
.orElse(null);
}
} }
@Test @Test

View file

@ -1,6 +1,5 @@
package net.helenus.test.integration.core.draft; package net.helenus.test.integration.core.draft;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import net.helenus.core.AbstractAuditedEntityDraft; import net.helenus.core.AbstractAuditedEntityDraft;
import net.helenus.core.Helenus; import net.helenus.core.Helenus;
@ -92,6 +91,5 @@ public interface Inventory extends Entity, Drafted<Inventory> {
mutate(inventory::NORAM, count); mutate(inventory::NORAM, count);
return this; return this;
} }
} }
} }

View file

@ -208,17 +208,13 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest {
} }
public void testFunTuple() throws TimeoutException { public void testFunTuple() throws TimeoutException {
Fun.Tuple1<String> tf = session Fun.Tuple1<String> tf =
.select(user::name) session.select(user::name).where(user::id, eq(100L)).single().sync().orElse(null);
.where(user::id, eq(100L)) if (tf != null) {
.single() Assert.assertEquals(Fun.class, tf.getClass().getEnclosingClass());
.sync() String name = tf._1;
.orElse(null); Assert.assertEquals("greg", name);
if (tf != null) { }
Assert.assertEquals(Fun.class, tf.getClass().getEnclosingClass());
String name = tf._1;
Assert.assertEquals("greg", name);
}
} }
public void testZipkin() throws TimeoutException { public void testZipkin() throws TimeoutException {

View file

@ -19,7 +19,6 @@ import static net.helenus.core.Query.eq;
import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.utils.UUIDs; import com.datastax.driver.core.utils.UUIDs;
import java.util.Date; import java.util.Date;
import java.util.UUID; import java.util.UUID;
import net.bytebuddy.utility.RandomString; import net.bytebuddy.utility.RandomString;
@ -28,7 +27,6 @@ import net.helenus.core.HelenusSession;
import net.helenus.core.UnitOfWork; import net.helenus.core.UnitOfWork;
import net.helenus.core.annotation.Cacheable; import net.helenus.core.annotation.Cacheable;
import net.helenus.core.reflect.Entity; import net.helenus.core.reflect.Entity;
import net.helenus.core.reflect.MapExportable;
import net.helenus.mapping.annotation.Constraints; import net.helenus.mapping.annotation.Constraints;
import net.helenus.mapping.annotation.Index; import net.helenus.mapping.annotation.Index;
import net.helenus.mapping.annotation.PartitionKey; import net.helenus.mapping.annotation.PartitionKey;
@ -53,10 +51,10 @@ interface Widget extends Entity {
String b(); String b();
@Constraints.Distinct(alone=false) @Constraints.Distinct(alone = false)
String c(); String c();
@Constraints.Distinct(combined=false) @Constraints.Distinct(combined = false)
String d(); String d();
} }
@ -132,7 +130,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
// This should inserted Widget, and not cache it in uow1. // This should inserted Widget, and not cache it in uow1.
try (UnitOfWork uow1 = session.begin()) { try (UnitOfWork uow1 = session.begin()) {
w1 = session w1 =
session
.<Widget>insert(widget) .<Widget>insert(widget)
.value(widget::id, key1) .value(widget::id, key1)
.value(widget::name, RandomString.make(20)) .value(widget::name, RandomString.make(20))
@ -144,18 +143,20 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
try (UnitOfWork uow2 = session.begin(uow1)) { try (UnitOfWork uow2 = session.begin(uow1)) {
// A "SELECT * FROM widget" query does not contain enough information to fetch an item from cache. // A "SELECT * FROM widget" query does not contain enough information to fetch an item from cache.
// This will miss, until we implement a statement cache. // This will miss, until we implement a statement cache.
w1a = session w1a =
.<Widget>selectAll(Widget.class) session
.sync(uow2) .<Widget>selectAll(Widget.class)
.filter(w -> w.id().equals(key1)) .sync(uow2)
.findFirst() .filter(w -> w.id().equals(key1))
.orElse(null); .findFirst()
Assert.assertTrue(w1.equals(w1a)); .orElse(null);
Assert.assertTrue(w1.equals(w1a));
// This should read from uow1's cache and return the same Widget. // This should read from uow1's cache and return the same Widget.
w2 = session w2 =
session
.<Widget>select(widget) .<Widget>select(widget)
.where(widget::id, eq(key1)) .where(widget::id, eq(key1))
.single() .single()
@ -164,7 +165,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
Assert.assertEquals(w1, w2); Assert.assertEquals(w1, w2);
w3 = session w3 =
session
.<Widget>insert(widget) .<Widget>insert(widget)
.value(widget::id, key2) .value(widget::id, key2)
.value(widget::name, RandomString.make(20)) .value(widget::name, RandomString.make(20))
@ -182,7 +184,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
} }
// This should read from the cache and get the same instance of a Widget. // This should read from the cache and get the same instance of a Widget.
w4 = session w4 =
session
.<Widget>select(widget) .<Widget>select(widget)
.where(widget::a, eq(w3.a())) .where(widget::a, eq(w3.a()))
.and(widget::b, eq(w3.b())) .and(widget::b, eq(w3.b()))
@ -261,11 +264,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
Assert.assertEquals(w1, w2); Assert.assertEquals(w1, w2);
// This should remove the object from the session cache. // This should remove the object from the session cache.
w3 = session w3 =
.<Widget>update(w2) session.<Widget>update(w2).set(widget::name, "Bill").where(widget::id, eq(key)).sync(uow);
.set(widget::name, "Bill")
.where(widget::id, eq(key))
.sync(uow);
// Fetch from session cache will cache miss (as it was updated) and trigger a SELECT. // Fetch from session cache will cache miss (as it was updated) and trigger a SELECT.
w4 = session.<Widget>select(widget).where(widget::id, eq(key)).single().sync().orElse(null); w4 = session.<Widget>select(widget).where(widget::id, eq(key)).single().sync().orElse(null);
@ -324,14 +324,15 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
try (UnitOfWork uow = session.begin()) { try (UnitOfWork uow = session.begin()) {
// This should read from the database and return a Widget. // This should read from the database and return a Widget.
w2 = session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null); w2 =
session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null);
// This should remove the object from the cache. // This should remove the object from the cache.
session.delete(widget).where(widget::id, eq(key)) session.delete(widget).where(widget::id, eq(key)).sync(uow);
.sync(uow);
// This should fail to read from the cache. // This should fail to read from the cache.
w3 = session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null); w3 =
session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null);
Assert.assertEquals(null, w3); Assert.assertEquals(null, w3);
@ -343,13 +344,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
}); });
} }
w4 = w4 = session.<Widget>select(widget).where(widget::id, eq(key)).single().sync().orElse(null);
session
.<Widget>select(widget)
.where(widget::id, eq(key))
.single()
.sync()
.orElse(null);
Assert.assertEquals(null, w4); Assert.assertEquals(null, w4);
} }
@ -361,17 +356,21 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
UUID key = UUIDs.timeBased(); UUID key = UUIDs.timeBased();
try (UnitOfWork uow = session.begin()) { try (UnitOfWork uow = session.begin()) {
w1 = session.<Widget>upsert(widget) w1 =
.value(widget::id, key) session
.value(widget::name, RandomString.make(20)) .<Widget>upsert(widget)
.value(widget::a, RandomString.make(10)) .value(widget::id, key)
.value(widget::b, RandomString.make(10)) .value(widget::name, RandomString.make(20))
.value(widget::c, RandomString.make(10)) .value(widget::a, RandomString.make(10))
.value(widget::d, RandomString.make(10)) .value(widget::b, RandomString.make(10))
.batch(uow); .value(widget::c, RandomString.make(10))
.value(widget::d, RandomString.make(10))
.batch(uow);
Assert.assertTrue(0L == w1.writtenAt(widget::name)); Assert.assertTrue(0L == w1.writtenAt(widget::name));
Assert.assertTrue(0 == w1.ttlOf(widget::name)); Assert.assertTrue(0 == w1.ttlOf(widget::name));
w2 = session.<Widget>update(w1) w2 =
session
.<Widget>update(w1)
.set(widget::name, RandomString.make(10)) .set(widget::name, RandomString.make(10))
.where(widget::id, eq(key)) .where(widget::id, eq(key))
.usingTtl(30) .usingTtl(30)
@ -379,7 +378,9 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
Assert.assertEquals(w1, w2); Assert.assertEquals(w1, w2);
Assert.assertTrue(0L == w2.writtenAt(widget::name)); Assert.assertTrue(0L == w2.writtenAt(widget::name));
Assert.assertTrue(30 == w1.ttlOf(widget::name)); Assert.assertTrue(30 == w1.ttlOf(widget::name));
w3 = session.<Widget>select(Widget.class) w3 =
session
.<Widget>select(Widget.class)
.where(widget::id, eq(key)) .where(widget::id, eq(key))
.single() .single()
.sync(uow) .sync(uow)
@ -388,14 +389,16 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
Assert.assertTrue(0L == w3.writtenAt(widget::name)); Assert.assertTrue(0L == w3.writtenAt(widget::name));
Assert.assertTrue(30 <= w3.ttlOf(widget::name)); Assert.assertTrue(30 <= w3.ttlOf(widget::name));
w6 = session.<Widget>upsert(widget) w6 =
.value(widget::id, UUIDs.timeBased()) session
.value(widget::name, RandomString.make(20)) .<Widget>upsert(widget)
.value(widget::a, RandomString.make(10)) .value(widget::id, UUIDs.timeBased())
.value(widget::b, RandomString.make(10)) .value(widget::name, RandomString.make(20))
.value(widget::c, RandomString.make(10)) .value(widget::a, RandomString.make(10))
.value(widget::d, RandomString.make(10)) .value(widget::b, RandomString.make(10))
.batch(uow); .value(widget::c, RandomString.make(10))
.value(widget::d, RandomString.make(10))
.batch(uow);
uow.commit(); uow.commit();
committedAt = uow.committedAt(); committedAt = uow.committedAt();
@ -403,7 +406,9 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
String date = d.toString(); String date = d.toString();
} }
// 'c' is distinct, but not on it's own so this should miss cache // 'c' is distinct, but not on it's own so this should miss cache
w4 = session.<Widget>select(Widget.class) w4 =
session
.<Widget>select(Widget.class)
.where(widget::c, eq(w3.c())) .where(widget::c, eq(w3.c()))
.single() .single()
.sync() .sync()
@ -413,7 +418,9 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
//Assert.assertTrue(at == committedAt); //Assert.assertTrue(at == committedAt);
int ttl4 = w4.ttlOf(widget::name); int ttl4 = w4.ttlOf(widget::name);
Assert.assertTrue(ttl4 <= 30); Assert.assertTrue(ttl4 <= 30);
w5 = session.<Widget>select(Widget.class) w5 =
session
.<Widget>select(Widget.class)
.where(widget::id, eq(key)) .where(widget::id, eq(key))
.uncached() .uncached()
.single() .single()
@ -433,7 +440,9 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
try (UnitOfWork uow = session.begin()) { try (UnitOfWork uow = session.begin()) {
// This should inserted Widget, but not cache it. // This should inserted Widget, but not cache it.
w1 = session.<Widget>insert(widget) w1 =
session
.<Widget>insert(widget)
.value(widget::id, key1) .value(widget::id, key1)
.value(widget::name, RandomString.make(20)) .value(widget::name, RandomString.make(20))
.sync(uow); .sync(uow);
@ -451,12 +460,16 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
//Assert.assertEquals(w1, w2); //Assert.assertEquals(w1, w2);
} }
@Test public void testSelectAfterInsertProperlyCachesEntity() throws @Test
Exception { Widget w1, w2, w3, w4; UUID key = UUIDs.timeBased(); public void testSelectAfterInsertProperlyCachesEntity() throws Exception {
Widget w1, w2, w3, w4;
UUID key = UUIDs.timeBased();
try (UnitOfWork uow = session.begin()) { try (UnitOfWork uow = session.begin()) {
// This should cache the inserted Widget. // This should cache the inserted Widget.
w1 = session.<Widget>insert(widget) w1 =
session
.<Widget>insert(widget)
.value(widget::id, key) .value(widget::id, key)
.value(widget::name, RandomString.make(20)) .value(widget::name, RandomString.make(20))
.value(widget::a, RandomString.make(10)) .value(widget::a, RandomString.make(10))
@ -466,27 +479,27 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.sync(uow); .sync(uow);
// This should read from the cache and get the same instance of a Widget. // This should read from the cache and get the same instance of a Widget.
w2 = session.<Widget>select(widget) w2 =
.where(widget::id, eq(key)) session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null);
.single()
.sync(uow)
.orElse(null);
uow.commit() .andThen(() -> { Assert.assertEquals(w1, w2); }); } uow.commit()
.andThen(
() -> {
Assert.assertEquals(w1, w2);
});
}
// This should read the widget from the session cache and maintain object identity. // This should read the widget from the session cache and maintain object identity.
w3 = session.<Widget>select(widget) w3 = session.<Widget>select(widget).where(widget::id, eq(key)).single().sync().orElse(null);
.where(widget::id, eq(key))
.single()
.sync()
.orElse(null);
Assert.assertEquals(w1, w3); Assert.assertEquals(w1, w3);
// This should read the widget from the database, no object identity but // This should read the widget from the database, no object identity but
// values should match. // values should match.
w4 = session.<Widget>select(widget) w4 =
.where(widget::id,eq(key)) session
.<Widget>select(widget)
.where(widget::id, eq(key))
.uncached() .uncached()
.single() .single()
.sync() .sync()
@ -496,5 +509,4 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
Assert.assertTrue(w1.equals(w4)); Assert.assertTrue(w1.equals(w4));
Assert.assertTrue(w4.equals(w1)); Assert.assertTrue(w4.equals(w1));
} }
} }

View file

@ -55,6 +55,5 @@ public interface Account {
public Map<String, Object> toMap() { public Map<String, Object> toMap() {
return null; return null;
} }
} }
} }