more progress.

This commit is contained in:
Greg Burd 2017-10-19 13:08:58 -04:00
parent f64d5edd7c
commit 75f32eb542
52 changed files with 995 additions and 641 deletions

31
NOTES
View file

@ -320,3 +320,34 @@ begin:
}
};
}
----------------------------------
if ("ttl".equals(methodName) && method.getParameterCount() == 1 && method.getReturnType() == int.class) {
Getter getter = (Getter) args[0];
if (getter == null) {
return false;
}
HelenusProperty prop = MappingUtil.resolveMappingProperty(getter).getProperty();
String getterName = prop.getPropertyName();
String ttlKeyForProperty = prop.getColumnName().toCql() + "_ttl";
if (src.containsKey(ttlKeyForProperty)) {
return src.get(ttlKeyForProperty);
} else {
return 0;
}
}
if ("written".equals(methodName) && method.getParameterCount() == 1 && method.getReturnType() == int.class) {
Getter getter = (Getter) args[0];
if (getter == null) {
return false;
}
HelenusProperty prop = MappingUtil.resolveMappingProperty(getter).getProperty();
String getterName = prop.getPropertyName();
String ttlKeyForProperty = prop.getColumnName().toCql() + "_ttl";
if (src.containsKey(ttlKeyForProperty)) {
return src.get(ttlKeyForProperty);
} else {
return 0;
}
}

View file

@ -17,22 +17,24 @@ package net.helenus.core;
import com.diffplug.common.base.Errors;
import com.google.common.collect.TreeTraverser;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.*;
import java.util.stream.Collectors;
import net.helenus.core.cache.BoundFacet;
import net.helenus.support.Either;
import org.ahocorasick.trie.Emit;
import org.ahocorasick.trie.Trie;
import java.util.*;
import java.util.stream.Collectors;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfWork<E>, AutoCloseable {
public abstract class AbstractUnitOfWork<E extends Exception>
implements UnitOfWork<E>, AutoCloseable {
private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>();
private final HelenusSession session;
private final AbstractUnitOfWork<E> parent;
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
private final Map<String, Either<Object, Set<Object>>> cache = new HashMap<String, Either<Object, Set<Object>>>();
private Trie cacheIndex = Trie.builder().ignoreOverlaps().onlyWholeWordsWhiteSpaceSeparated().build();
private final Map<String, Either<Object, Set<Object>>> cache =
new HashMap<String, Either<Object, Set<Object>>>();
private Trie cacheIndex =
Trie.builder().ignoreOverlaps().onlyWholeWordsWhiteSpaceSeparated().build();
private boolean aborted = false;
private boolean committed = false;
@ -65,48 +67,64 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
}
@Override
public Optional<Either<Object, Set<Object>>> cacheLookupByFacet(Set<EntityIdentifyingFacet> facets) {
Optional<Either<Object, Set<Object>>> result = null;
Collection<Emit> emits = cacheIndex.parseText(String.join(" ", facets.stream()
.map(facet -> facet.toString()).collect(Collectors.toList())));
for (Emit emit : emits) {
// NOTE: rethink. should this match *all* facets? how do I know which emit keyword is the primary key?
String key = emit.getKeyword();
result = cacheLookup(key);
public Optional<Either<Object, Set<Object>>> cacheLookupByFacet(Set<BoundFacet> facets) {
Optional<Either<Object, Set<Object>>> result = Optional.empty();
Collection<Emit> emits =
cacheIndex.parseText(
String.join(
" ", facets.stream().map(facet -> facet.toString()).collect(Collectors.toList())));
for (Emit emit : emits) {
// NOTE: rethink. should this match *all* facets? how do I know which emit keyword is the primary key?
String key = emit.getKeyword();
result = cacheLookup(key);
if (result.isPresent()) {
return result;
}
return result;
}
if (!result.isPresent()) {
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
if (parent != null) {
return parent.cacheLookupByFacet(facets);
}
}
return result;
}
@Override
public Optional<Either<Object, Set<Object>>> cacheLookupByStatement(String[] statementKeys) {
Optional<Either<Object, Set<Object>>> result = null;
String key = String.join(",", statementKeys);
return cacheLookup(key);
String key = String.join(",", statementKeys);
return cacheLookup(key);
}
@Override
public Optional<Either<Object, Set<Object>>> cacheLookup(String key) {
Optional<Either<Object, Set<Object>>> result = Optional.of(cache.get(key));
Optional<Either<Object, Set<Object>>> result =
(cache.containsKey(key)) ? Optional.of(cache.get(key)) : Optional.empty();
if (result.isPresent()) {
return result;
} else {
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
if (parent != null) {
return parent.cacheLookup(key);
}
if (!result.isPresent()) {
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
if (parent != null) {
return parent.cacheLookup(key);
}
return Optional.empty();
}
return result;
}
@Override
public void cacheUpdate(Either<Object, Set<Object>> value, String[] statementKeys, Set<EntityIdentifyingFacet> facets) {
String key = String.join(",", statementKeys);
cache.put(key, value);
Trie.TrieBuilder builder = cacheIndex.builder();
facets.forEach(facet -> {
public void cacheUpdate(
Either<Object, Set<Object>> value, String[] statementKeys, Map<String, BoundFacet> facetMap) {
String key = "CQL::" + String.join(",", statementKeys);
cache.put(key, value);
Trie.TrieBuilder builder =
cacheIndex.builder().ignoreOverlaps().onlyWholeWordsWhiteSpaceSeparated();
facetMap.forEach(
(facetName, facet) -> {
builder.addKeyword(facet.toString());
});
if (facetName.equals("*")) {
cache.put(facet.toString(), value);
}
});
cacheIndex = builder.build();
}
private Iterator<AbstractUnitOfWork<E>> getChildNodes() {
@ -145,7 +163,7 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
// Merge UOW cache into parent's cache.
if (parent != null) {
parent.assumeCache(cache, cacheIndex);
parent.assumeCache(cache, cacheIndex);
}
// Apply all post-commit functions for
@ -181,35 +199,36 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
// cache.invalidateSince(txn::start time)
}
private void assumeCache(Map<String, Either<Object, Set<Object>>> childCache, Trie childCacheIndex) {
for (String key : childCache.keySet()) {
if (cache.containsKey(key)) {
Either<Object, Set<Object>> value = cache.get(key);
if (value.isLeft()) {
Object obj = value.getLeft();
// merge objects
Either<Object, Set<Object>> childValue = childCache.get(key);
if (childValue.isLeft()) {
Object childObj = childValue.getLeft();
} else {
Set<Object> childSet = childValue.getRight();
}
} else {
// merge the sets
Set<Object> set = value.getRight();
Either<Object, Set<Object>> childValue = childCache.get(key);
if (childValue.isLeft()) {
Object childObj = childValue.getLeft();
set.add(childObj);
} else {
Set<Object> childSet = childValue.getRight();
set.addAll(childSet);
}
}
private void assumeCache(
Map<String, Either<Object, Set<Object>>> childCache, Trie childCacheIndex) {
for (String key : childCache.keySet()) {
if (cache.containsKey(key)) {
Either<Object, Set<Object>> value = cache.get(key);
if (value.isLeft()) {
Object obj = value.getLeft();
// merge objects
Either<Object, Set<Object>> childValue = childCache.get(key);
if (childValue.isLeft()) {
Object childObj = childValue.getLeft();
} else {
cache.put(key, childCache.get(key));
Set<Object> childSet = childValue.getRight();
}
} else {
// merge the sets
Set<Object> set = value.getRight();
Either<Object, Set<Object>> childValue = childCache.get(key);
if (childValue.isLeft()) {
Object childObj = childValue.getLeft();
set.add(childObj);
} else {
Set<Object> childSet = childValue.getRight();
set.addAll(childSet);
}
}
} else {
cache.put(key, childCache.get(key));
}
}
}
public String describeConflicts() {

View file

@ -277,7 +277,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
}
DslExportable dsl = (DslExportable) Helenus.dsl(iface);
dsl.setCassandraMetadataForHelenusSesion(session.getCluster().getMetadata());
dsl.setCassandraMetadataForHelenusSession(session.getCluster().getMetadata());
sessionRepository.add(dsl);
});

View file

@ -15,11 +15,11 @@
*/
package net.helenus.core;
import net.helenus.core.cache.EntityIdentifyingFacet;
import net.helenus.support.Either;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import net.helenus.core.cache.BoundFacet;
import net.helenus.support.Either;
public interface UnitOfWork<X extends Exception> extends AutoCloseable {
@ -52,8 +52,11 @@ public interface UnitOfWork<X extends Exception> extends AutoCloseable {
boolean hasCommitted();
Optional<Either<Object, Set<Object>>> cacheLookup(String key);
Optional<Either<Object, Set<Object>>> cacheLookupByFacet(Set<EntityIdentifyingFacet> facets);
Optional<Either<Object, Set<Object>>> cacheLookupByStatement(String[] statementKeys);
void cacheUpdate(Either<Object, Set<Object>> pojo, String[] statementKeys, Set<EntityIdentifyingFacet> facets);
Optional<Either<Object, Set<Object>>> cacheLookupByFacet(Set<BoundFacet> facets);
Optional<Either<Object, Set<Object>>> cacheLookupByStatement(String[] statementKeys);
void cacheUpdate(
Either<Object, Set<Object>> pojo, String[] statementKeys, Map<String, BoundFacet> facets);
}

View file

@ -1,3 +1,18 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.annotation;
import java.lang.annotation.ElementType;

View file

@ -0,0 +1,38 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.cache;
import java.util.Map;
import java.util.stream.Collectors;
import net.helenus.mapping.HelenusProperty;
public class BoundFacet {
private final Map<HelenusProperty, Object> properties;
BoundFacet(Map<HelenusProperty, Object> properties) {
this.properties = properties;
}
public String toString() {
return String.join(
";",
properties
.keySet()
.stream()
.map(key -> properties.get(key).toString())
.collect(Collectors.toSet()));
}
}

View file

@ -1,28 +1,72 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.cache;
import net.helenus.mapping.HelenusProperty;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import net.helenus.mapping.HelenusProperty;
public class EntityIdentifyingFacet extends Facet {
public EntityIdentifyingFacet(HelenusProperty prop) {}
private final Set<HelenusProperty> properties;
public EntityIdentifyingFacet(HelenusProperty[]... props) {}
public EntityIdentifyingFacet(HelenusProperty prop) {
properties = new HashSet<HelenusProperty>();
properties.add(prop);
}
public EntityIdentifyingFacet(Set<HelenusProperty> props) {
properties = props;
}
public boolean isFullyBound() {
return false;
return false;
}
public HelenusProperty getProperty() {
return null;
public Set<HelenusProperty> getProperties() {
return properties;
}
public Set<HelenusProperty> getUnboundEntityProperties() {
return null;
public Binder binder() {
return new Binder(properties);
}
public void setValueForProperty(HelenusProperty prop, Object value) {
}
public static class Binder {
private final Set<HelenusProperty> properties = new HashSet<HelenusProperty>();
private Map<HelenusProperty, Object> boundProperties = new HashMap<HelenusProperty, Object>();
Binder(Set<HelenusProperty> properties) {
this.properties.addAll(properties);
}
public Binder setValueForProperty(HelenusProperty prop, Object value) {
properties.remove(prop);
boundProperties.put(prop, value);
return this;
}
public boolean isFullyBound() {
return properties.isEmpty();
}
public BoundFacet bind() {
return new BoundFacet(boundProperties);
}
}
}

View file

@ -22,7 +22,9 @@ import java.util.Map;
import net.helenus.core.*;
import net.helenus.mapping.HelenusProperty;
public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterStreamOperation<E, O>> extends AbstractStreamOperation<E, O> {
public abstract class AbstractFilterStreamOperation<
E, O extends AbstractFilterStreamOperation<E, O>>
extends AbstractStreamOperation<E, O> {
protected Map<HelenusProperty, Filter<?>> filters = null;
protected List<Filter<?>> ifFilters = null;

View file

@ -17,13 +17,9 @@ package net.helenus.core.operation;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import java.sql.Time;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import com.diffplug.common.base.Errors;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
@ -47,7 +43,15 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
public E sync() throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
ResultSet resultSet =
this.execute(
sessionOps,
null,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
false);
return transform(resultSet);
} finally {
context.stop();
@ -59,7 +63,15 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, true);
ResultSet resultSet =
execute(
sessionOps,
uow,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
true);
E result = transform(resultSet);
return result;
} finally {
@ -68,19 +80,25 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
}
public CompletableFuture<E> async() {
return CompletableFuture.<E>supplyAsync(() -> {
try {
return CompletableFuture.<E>supplyAsync(
() -> {
try {
return sync();
} catch (TimeoutException ex) { throw new CompletionException(ex); }
});
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
public CompletableFuture<E> async(UnitOfWork uow) {
if (uow == null) return async();
return CompletableFuture.<E>supplyAsync(() -> {
try {
return CompletableFuture.<E>supplyAsync(
() -> {
try {
return sync();
} catch (TimeoutException ex) { throw new CompletionException(ex); }
});
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
}

View file

@ -21,109 +21,125 @@ import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.BoundFacet;
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
extends AbstractStatementOperation<E, O> {
extends AbstractStatementOperation<E, O> {
public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
}
public abstract Optional<E> transform(ResultSet resultSet);
public PreparedOptionalOperation<E> prepare() {
return new PreparedOptionalOperation<E>(prepareStatement(), this);
}
public ListenableFuture<PreparedOptionalOperation<E>> prepareAsync() {
final O _this = (O) this;
return Futures.transform(
prepareStatementAsync(),
new Function<PreparedStatement, PreparedOptionalOperation<E>>() {
@Override
public PreparedOptionalOperation<E> apply(PreparedStatement preparedStatement) {
return new PreparedOptionalOperation<E>(preparedStatement, _this);
}
});
}
public Optional<E> sync() throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet =
this.execute(
sessionOps,
null,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
false);
return transform(resultSet);
} finally {
context.stop();
}
}
public abstract Optional<E> transform(ResultSet resultSet);
public Optional<E> sync(UnitOfWork<?> uow) throws TimeoutException {
if (uow == null) return sync();
public PreparedOptionalOperation<E> prepare() {
return new PreparedOptionalOperation<E>(prepareStatement(), this);
}
final Timer.Context context = requestLatency.time();
try {
public ListenableFuture<PreparedOptionalOperation<E>> prepareAsync() {
final O _this = (O) this;
return Futures.transform(
prepareStatementAsync(),
new Function<PreparedStatement, PreparedOptionalOperation<E>>() {
@Override
public PreparedOptionalOperation<E> apply(PreparedStatement preparedStatement) {
return new PreparedOptionalOperation<E>(preparedStatement, _this);
}
});
}
Optional<E> result = Optional.empty();
E cacheResult = null;
String[] statementKeys = null;
public Optional<E> sync() throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, false);
return transform(resultSet);
} finally {
context.stop();
if (enableCache) {
Set<BoundFacet> facets = bindFacetValues();
statementKeys = getQueryKeys();
cacheResult = checkCache(uow, facets, statementKeys);
if (cacheResult != null) {
result = Optional.of(cacheResult);
}
}
}
public Optional<E> sync(UnitOfWork<?> uow) throws TimeoutException {
if (uow == null)
if (!result.isPresent()) {
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet =
execute(
sessionOps,
uow,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
true);
// Transform the query result set into the desired shape.
result = transform(resultSet);
}
// If we have a result, it wasn't from cache, and we're caching things then we need to put this result
// into the cache for future requests to find.
if (enableCache && cacheResult == null && result.isPresent()) {
updateCache(uow, result.get(), getIdentifyingFacets(), statementKeys);
}
return result;
} finally {
context.stop();
}
}
public CompletableFuture<Optional<E>> async() {
return CompletableFuture.<Optional<E>>supplyAsync(
() -> {
try {
return sync();
final Timer.Context context = requestLatency.time();
try {
Optional<E> result = Optional.empty();
String[] statementKeys = null;
if (enableCache) {
Set<EntityIdentifyingFacet> facets = getFacets();
statementKeys = getQueryKeys();
result = Optional.of(checkCache(uow, facets, statementKeys));
}
if (!result.isPresent()) {
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true);
// Transform the query result set into the desired shape.
result = transform(resultSet);
}
// If we have a result and we're caching then we need to put it into the cache for future requests to find.
if (enableCache && result.isPresent()) {
updateCache(uow, result.get(), statementKeys);
}
return result;
} finally {
context.stop();
}
}
public CompletableFuture<Optional<E>> async() {
return CompletableFuture.<Optional<E>>supplyAsync(() -> {
try {
return sync();
}
catch (TimeoutException ex) {
throw new CompletionException(ex);
}
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
}
public CompletableFuture<Optional<E>> async(UnitOfWork<?> uow) {
if (uow == null)
return async();
return CompletableFuture.<Optional<E>>supplyAsync(() -> {
try {
return sync();
}
catch (TimeoutException ex) {
throw new CompletionException(ex);
}
public CompletableFuture<Optional<E>> async(UnitOfWork<?> uow) {
if (uow == null) return async();
return CompletableFuture.<Optional<E>>supplyAsync(
() -> {
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
}
}

View file

@ -27,358 +27,377 @@ import com.datastax.driver.core.policies.FallthroughRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.Helenus;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.BoundFacet;
import net.helenus.core.cache.EntityIdentifyingFacet;
import net.helenus.mapping.HelenusProperty;
import net.helenus.core.reflect.MapExportable;
import net.helenus.mapping.value.BeanColumnValueProvider;
import net.helenus.support.Either;
import net.helenus.support.HelenusException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public abstract class AbstractStatementOperation<E, O extends AbstractStatementOperation<E, O>>
extends Operation<E> {
extends Operation<E> {
final Logger logger = LoggerFactory.getLogger(getClass());
protected boolean enableCache = true;
protected boolean showValues = true;
protected TraceContext traceContext;
long queryExecutionTimeout = 10;
TimeUnit queryTimeoutUnits = TimeUnit.SECONDS;
private ConsistencyLevel consistencyLevel;
private ConsistencyLevel serialConsistencyLevel;
private RetryPolicy retryPolicy;
private boolean idempotent = false;
private boolean enableTracing = false;
private long[] defaultTimestamp = null;
private int[] fetchSize = null;
public AbstractStatementOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
this.consistencyLevel = sessionOperations.getDefaultConsistencyLevel();
this.idempotent = sessionOperations.getDefaultQueryIdempotency();
final Logger logger = LoggerFactory.getLogger(getClass());
protected boolean enableCache = true;
protected boolean showValues = true;
protected TraceContext traceContext;
long queryExecutionTimeout = 10;
TimeUnit queryTimeoutUnits = TimeUnit.SECONDS;
private ConsistencyLevel consistencyLevel;
private ConsistencyLevel serialConsistencyLevel;
private RetryPolicy retryPolicy;
private boolean idempotent = false;
private boolean enableTracing = false;
private long[] defaultTimestamp = null;
private int[] fetchSize = null;
public AbstractStatementOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
this.consistencyLevel = sessionOperations.getDefaultConsistencyLevel();
this.idempotent = sessionOperations.getDefaultQueryIdempotency();
}
public abstract Statement buildStatement(boolean cached);
public O ignoreCache(boolean enabled) {
enableCache = enabled;
return (O) this;
}
public O ignoreCache() {
enableCache = true;
return (O) this;
}
public O showValues(boolean enabled) {
this.showValues = enabled;
return (O) this;
}
public O defaultTimestamp(long timestamp) {
this.defaultTimestamp = new long[1];
this.defaultTimestamp[0] = timestamp;
return (O) this;
}
public O retryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return (O) this;
}
public O defaultRetryPolicy() {
this.retryPolicy = DefaultRetryPolicy.INSTANCE;
return (O) this;
}
public O idempotent() {
this.idempotent = true;
return (O) this;
}
public O isIdempotent(boolean idempotent) {
this.idempotent = idempotent;
return (O) this;
}
public O downgradingConsistencyRetryPolicy() {
this.retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
return (O) this;
}
public O fallthroughRetryPolicy() {
this.retryPolicy = FallthroughRetryPolicy.INSTANCE;
return (O) this;
}
public O consistency(ConsistencyLevel level) {
this.consistencyLevel = level;
return (O) this;
}
public O consistencyAny() {
this.consistencyLevel = ConsistencyLevel.ANY;
return (O) this;
}
public O consistencyOne() {
this.consistencyLevel = ConsistencyLevel.ONE;
return (O) this;
}
public O consistencyQuorum() {
this.consistencyLevel = ConsistencyLevel.QUORUM;
return (O) this;
}
public O consistencyAll() {
this.consistencyLevel = ConsistencyLevel.ALL;
return (O) this;
}
public O consistencyLocalOne() {
this.consistencyLevel = ConsistencyLevel.LOCAL_ONE;
return (O) this;
}
public O consistencyLocalQuorum() {
this.consistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
return (O) this;
}
public O consistencyEachQuorum() {
this.consistencyLevel = ConsistencyLevel.EACH_QUORUM;
return (O) this;
}
public O serialConsistency(ConsistencyLevel level) {
this.serialConsistencyLevel = level;
return (O) this;
}
public O serialConsistencyAny() {
this.serialConsistencyLevel = ConsistencyLevel.ANY;
return (O) this;
}
public O serialConsistencyOne() {
this.serialConsistencyLevel = ConsistencyLevel.ONE;
return (O) this;
}
public O serialConsistencyQuorum() {
this.serialConsistencyLevel = ConsistencyLevel.QUORUM;
return (O) this;
}
public O serialConsistencyAll() {
this.serialConsistencyLevel = ConsistencyLevel.ALL;
return (O) this;
}
public O serialConsistencyLocal() {
this.serialConsistencyLevel = ConsistencyLevel.LOCAL_SERIAL;
return (O) this;
}
public O serialConsistencyLocalQuorum() {
this.serialConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
return (O) this;
}
public O disableTracing() {
this.enableTracing = false;
return (O) this;
}
public O enableTracing() {
this.enableTracing = true;
return (O) this;
}
public O tracing(boolean enable) {
this.enableTracing = enable;
return (O) this;
}
public O fetchSize(int fetchSize) {
this.fetchSize = new int[1];
this.fetchSize[0] = fetchSize;
return (O) this;
}
public O queryTimeoutMs(long ms) {
this.queryExecutionTimeout = ms;
this.queryTimeoutUnits = TimeUnit.MILLISECONDS;
return (O) this;
}
public O queryTimeout(long timeout, TimeUnit units) {
this.queryExecutionTimeout = timeout;
this.queryTimeoutUnits = units;
return (O) this;
}
public Statement options(Statement statement) {
if (defaultTimestamp != null) {
statement.setDefaultTimestamp(defaultTimestamp[0]);
}
public abstract Statement buildStatement(boolean cached);
public O ignoreCache(boolean enabled) {
enableCache = enabled;
return (O) this;
if (consistencyLevel != null) {
statement.setConsistencyLevel(consistencyLevel);
}
public O ignoreCache() {
enableCache = true;
return (O) this;
if (serialConsistencyLevel != null) {
statement.setSerialConsistencyLevel(serialConsistencyLevel);
}
public O showValues(boolean enabled) {
this.showValues = enabled;
return (O) this;
if (retryPolicy != null) {
statement.setRetryPolicy(retryPolicy);
}
public O defaultTimestamp(long timestamp) {
this.defaultTimestamp = new long[1];
this.defaultTimestamp[0] = timestamp;
return (O) this;
if (enableTracing) {
statement.enableTracing();
} else {
statement.disableTracing();
}
public O retryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return (O) this;
if (fetchSize != null) {
statement.setFetchSize(fetchSize[0]);
}
public O defaultRetryPolicy() {
this.retryPolicy = DefaultRetryPolicy.INSTANCE;
return (O) this;
if (idempotent) {
statement.setIdempotent(true);
}
public O idempotent() {
this.idempotent = true;
return (O) this;
return statement;
}
public O zipkinContext(TraceContext traceContext) {
if (traceContext != null) {
Tracer tracer = this.sessionOps.getZipkinTracer();
if (tracer != null) {
this.traceContext = traceContext;
}
}
public O isIdempotent(boolean idempotent) {
this.idempotent = idempotent;
return (O) this;
return (O) this;
}
public Statement statement() {
return buildStatement(false);
}
public String cql() {
Statement statement = buildStatement(false);
if (statement == null) return "";
if (statement instanceof BuiltStatement) {
BuiltStatement buildStatement = (BuiltStatement) statement;
return buildStatement.setForceNoValues(true).getQueryString();
} else {
return statement.toString();
}
}
public PreparedStatement prepareStatement() {
Statement statement = buildStatement(true);
if (statement instanceof RegularStatement) {
RegularStatement regularStatement = (RegularStatement) statement;
return sessionOps.prepare(regularStatement);
}
public O downgradingConsistencyRetryPolicy() {
this.retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
return (O) this;
throw new HelenusException("only RegularStatements can be prepared");
}
public ListenableFuture<PreparedStatement> prepareStatementAsync() {
Statement statement = buildStatement(true);
if (statement instanceof RegularStatement) {
RegularStatement regularStatement = (RegularStatement) statement;
return sessionOps.prepareAsync(regularStatement);
}
public O fallthroughRetryPolicy() {
this.retryPolicy = FallthroughRetryPolicy.INSTANCE;
return (O) this;
}
throw new HelenusException("only RegularStatements can be prepared");
}
public O consistency(ConsistencyLevel level) {
this.consistencyLevel = level;
return (O) this;
}
protected E checkCache(UnitOfWork<?> uow, Set<BoundFacet> facets, String[] statementKeys) {
E result = null;
Optional<Either<Object, Set<Object>>> optionalCachedResult = Optional.empty();
public O consistencyAny() {
this.consistencyLevel = ConsistencyLevel.ANY;
return (O) this;
}
public O consistencyOne() {
this.consistencyLevel = ConsistencyLevel.ONE;
return (O) this;
}
public O consistencyQuorum() {
this.consistencyLevel = ConsistencyLevel.QUORUM;
return (O) this;
}
public O consistencyAll() {
this.consistencyLevel = ConsistencyLevel.ALL;
return (O) this;
}
public O consistencyLocalOne() {
this.consistencyLevel = ConsistencyLevel.LOCAL_ONE;
return (O) this;
}
public O consistencyLocalQuorum() {
this.consistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
return (O) this;
}
public O consistencyEachQuorum() {
this.consistencyLevel = ConsistencyLevel.EACH_QUORUM;
return (O) this;
}
public O serialConsistency(ConsistencyLevel level) {
this.serialConsistencyLevel = level;
return (O) this;
}
public O serialConsistencyAny() {
this.serialConsistencyLevel = ConsistencyLevel.ANY;
return (O) this;
}
public O serialConsistencyOne() {
this.serialConsistencyLevel = ConsistencyLevel.ONE;
return (O) this;
}
public O serialConsistencyQuorum() {
this.serialConsistencyLevel = ConsistencyLevel.QUORUM;
return (O) this;
}
public O serialConsistencyAll() {
this.serialConsistencyLevel = ConsistencyLevel.ALL;
return (O) this;
}
public O serialConsistencyLocal() {
this.serialConsistencyLevel = ConsistencyLevel.LOCAL_SERIAL;
return (O) this;
}
public O serialConsistencyLocalQuorum() {
this.serialConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
return (O) this;
}
public O disableTracing() {
this.enableTracing = false;
return (O) this;
}
public O enableTracing() {
this.enableTracing = true;
return (O) this;
}
public O tracing(boolean enable) {
this.enableTracing = enable;
return (O) this;
}
public O fetchSize(int fetchSize) {
this.fetchSize = new int[1];
this.fetchSize[0] = fetchSize;
return (O) this;
}
public O queryTimeoutMs(long ms) {
this.queryExecutionTimeout = ms;
this.queryTimeoutUnits = TimeUnit.MILLISECONDS;
return (O) this;
}
public O queryTimeout(long timeout, TimeUnit units) {
this.queryExecutionTimeout = timeout;
this.queryTimeoutUnits = units;
return (O) this;
}
public Statement options(Statement statement) {
if (defaultTimestamp != null) {
statement.setDefaultTimestamp(defaultTimestamp[0]);
if (!facets.isEmpty()) {
//TODO(gburd): what about select ResultSet, Tuple... etc.?
optionalCachedResult = uow.cacheLookupByFacet(facets);
if (optionalCachedResult.isPresent()) {
Either<Object, Set<Object>> eitherCachedResult = optionalCachedResult.get();
if (eitherCachedResult.isLeft()) {
uowCacheHits.mark();
logger.info("UnitOfWork({}) cache hit using facets", uow.hashCode());
result = (E) eitherCachedResult.getLeft();
}
if (consistencyLevel != null) {
statement.setConsistencyLevel(consistencyLevel);
}
if (serialConsistencyLevel != null) {
statement.setSerialConsistencyLevel(serialConsistencyLevel);
}
if (retryPolicy != null) {
statement.setRetryPolicy(retryPolicy);
}
if (enableTracing) {
statement.enableTracing();
} else {
statement.disableTracing();
}
if (fetchSize != null) {
statement.setFetchSize(fetchSize[0]);
}
if (idempotent) {
statement.setIdempotent(true);
}
return statement;
}
}
public O zipkinContext(TraceContext traceContext) {
if (traceContext != null) {
Tracer tracer = this.sessionOps.getZipkinTracer();
if (tracer != null) {
this.traceContext = traceContext;
if (result == null && statementKeys != null) {
// Then check to see if this query happens to uniquely identify a single object in thecache.
optionalCachedResult = uow.cacheLookupByStatement(statementKeys);
if (optionalCachedResult.isPresent()) {
Either<Object, Set<Object>> eitherCachedResult = optionalCachedResult.get();
// Statements always store Set<E> as the value in the cache.
if (eitherCachedResult.isRight()) {
Set<Object> cachedResult = eitherCachedResult.getRight();
if (cachedResult.size() == 1) {
Optional<Object> maybeResult = cachedResult.stream().findFirst();
if (maybeResult.isPresent()) {
uowCacheHits.mark();
logger.info("UnitOfWork({}) cache hit for stmt", uow.hashCode());
} else {
result = null;
}
}
}
return (O) this;
}
}
public Statement statement() {
return buildStatement(false);
if (result == null) {
uowCacheMiss.mark();
logger.info("UnitOfWork({}) cache miss", uow.hashCode());
}
public String cql() {
Statement statement = buildStatement(false);
if (statement == null)
return "";
if (statement instanceof BuiltStatement) {
BuiltStatement buildStatement = (BuiltStatement) statement;
return buildStatement.setForceNoValues(true).getQueryString();
} else {
return statement.toString();
}
}
return result;
}
public PreparedStatement prepareStatement() {
protected void updateCache(
UnitOfWork<?> uow,
E pojo,
Map<String, EntityIdentifyingFacet> facetMap,
String[] statementKeys) {
Statement statement = buildStatement(true);
if (statement instanceof RegularStatement) {
RegularStatement regularStatement = (RegularStatement) statement;
return sessionOps.prepare(regularStatement);
}
throw new HelenusException("only RegularStatements can be prepared");
}
public ListenableFuture<PreparedStatement> prepareStatementAsync() {
Statement statement = buildStatement(true);
if (statement instanceof RegularStatement) {
RegularStatement regularStatement = (RegularStatement) statement;
return sessionOps.prepareAsync(regularStatement);
}
throw new HelenusException("only RegularStatements can be prepared");
}
protected E checkCache(UnitOfWork<?> uow, Set<EntityIdentifyingFacet> facets, String[] statementKeys) {
E result = null;
if (!facets.isEmpty()) {
//TODO(gburd): what about select ResultSet, Tuple... etc.?
Optional<Either<Object, Set<Object>>> optionalCachedResult = uow.cacheLookupByFacet(facets);
if (optionalCachedResult.isPresent()) {
Either<Object, Set<Object>> eitherCachedResult = optionalCachedResult.get();
if (eitherCachedResult.isLeft()) {
uowCacheHits.mark();
logger.info("UnitOfWork({}) cache hit using facets", uow.hashCode());
result = (E) eitherCachedResult.getLeft();
}
}
} else {
// Then check to see if this query happens to uniquely identify a single object in the
// cache.
Optional<Either<Object, Set<Object>>> optionalCachedResult = uow.cacheLookupByStatement(statementKeys);
if (optionalCachedResult.isPresent()) {
Either<Object, Set<Object>> eitherCachedResult = optionalCachedResult.get();
// Statements always store Set<E> as the value in the cache.
if (eitherCachedResult.isRight()) {
Set<Object> cachedResult = eitherCachedResult.getRight();
if (cachedResult.size() == 1) {
Optional<Object> maybeResult = cachedResult.stream().findFirst();
if (maybeResult.isPresent()) {
uowCacheHits.mark();
logger.info("UnitOfWork({}) cache hit for stmt", uow.hashCode());
} else {
result = null;
}
}
}
}
}
if (result == null) {
uowCacheMiss.mark();
logger.info("UnitOfWork({}) cache miss", uow.hashCode());
}
return result;
}
protected void updateCache(UnitOfWork<?> uow, E pojo, String[] statementKeys) {
// Insert this entity into the cache for each facet for this entity that we can fully bind.
Map<String, EntityIdentifyingFacet> facetMap = Helenus.entity(pojo.getClass()).getIdentifyingFacets();
facetMap.forEach((facetName, facet) -> {
if (!facet.isFullyBound()) {
HelenusProperty prop = facet.getProperty();
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop);
facet.setValueForProperty(prop, value);
}
// Insert this entity into the cache for each facet for this entity that we can fully bind.
Map<String, BoundFacet> boundFacets = new HashMap<String, BoundFacet>();
Map<String, Object> valueMap =
pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
facetMap.forEach(
(facetName, facet) -> {
if (!facet.isFullyBound()) {
EntityIdentifyingFacet.Binder binder = facet.binder();
facet
.getProperties()
.forEach(
prop -> {
if (valueMap == null) {
Object value =
BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
binder.setValueForProperty(
prop, prop.getColumnName().toCql() + "==" + value.toString());
} else {
binder.setValueForProperty(
prop,
prop.getColumnName().toCql()
+ "=="
+ valueMap.get(prop.getPropertyName()).toString());
}
});
boundFacets.put(facetName, binder.bind());
}
});
// Cache the value (pojo), the statement key, and the fully bound facets.
if (statementKeys != null) {
uow.cacheUpdate(Either.left(pojo), statementKeys,
facetMap.values()
.stream()
.filter(facet -> facet.isFullyBound())
.collect(Collectors.toSet()));
}
}
// Cache the value (pojo), the statement key, and the fully bound facets.
uow.cacheUpdate(Either.left(pojo), statementKeys, boundFacets);
}
}

View file

@ -21,15 +21,14 @@ import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.BoundFacet;
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
extends AbstractStatementOperation<E, O> {
@ -59,7 +58,15 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public Stream<E> sync() throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
ResultSet resultSet =
this.execute(
sessionOps,
null,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
false);
return transform(resultSet);
} finally {
context.stop();
@ -67,34 +74,40 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
}
public Stream<E> sync(UnitOfWork<?> uow) throws TimeoutException {
if (uow == null)
return sync();
if (uow == null) return sync();
final Timer.Context context = requestLatency.time();
try {
Stream<E> result = null;
E cachedResult = null;
String[] statementKeys = null;
Stream<E> result = null;
E cachedResult = null;
String[] statementKeys = null;
if (enableCache) {
Set<EntityIdentifyingFacet> facets = getFacets();
statementKeys = getQueryKeys();
cachedResult = checkCache(uow, facets, statementKeys);
if (cachedResult != null) {
result = Stream.of(cachedResult);
}
if (enableCache) {
Set<BoundFacet> facets = bindFacetValues();
statementKeys = getQueryKeys();
cachedResult = checkCache(uow, facets, statementKeys);
if (cachedResult != null) {
result = Stream.of(cachedResult);
}
}
if (result == null) {
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true);
result = transform(resultSet);
}
if (result == null) {
ResultSet resultSet =
execute(
sessionOps,
uow,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
true);
result = transform(resultSet);
}
// If we have a result and we're caching then we need to put it into the cache for future requests to find.
if (enableCache && cachedResult != null) {
updateCache(uow, cachedResult, statementKeys);
}
// If we have a result and we're caching then we need to put it into the cache for future requests to find.
if (enableCache && cachedResult != null) {
updateCache(uow, cachedResult, getIdentifyingFacets(), statementKeys);
}
return result;
} finally {
@ -103,19 +116,25 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
}
public CompletableFuture<Stream<E>> async() {
return CompletableFuture.<Stream<E>>supplyAsync(() -> {
try {
return CompletableFuture.<Stream<E>>supplyAsync(
() -> {
try {
return sync();
} catch (TimeoutException ex) { throw new CompletionException(ex); }
});
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
public CompletableFuture<Stream<E>> async(UnitOfWork<?> uow) {
if (uow == null) return async();
return CompletableFuture.<Stream<E>>supplyAsync(() -> {
try {
return CompletableFuture.<Stream<E>>supplyAsync(
() -> {
try {
return sync();
} catch (TimeoutException ex) { throw new CompletionException(ex); }
});
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
}

View file

@ -18,10 +18,9 @@ package net.helenus.core.operation;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.Set;
import java.util.stream.Stream;
import net.helenus.core.cache.BoundFacet;
public final class BoundStreamOperation<E>
extends AbstractStreamOperation<E, BoundStreamOperation<E>> {
@ -42,7 +41,9 @@ public final class BoundStreamOperation<E>
}
@Override
public Set<EntityIdentifyingFacet> getFacets() { return delegate.getFacets(); }
public Set<BoundFacet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override
public Stream<E> transform(ResultSet resultSet) {

View file

@ -19,7 +19,6 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.base.Joiner;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
@ -28,6 +27,7 @@ import net.helenus.core.Getter;
import net.helenus.core.Helenus;
import net.helenus.core.UnitOfWork;
import net.helenus.core.reflect.DefaultPrimitiveTypes;
import net.helenus.core.reflect.Drafted;
import net.helenus.core.reflect.HelenusPropertyNode;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusProperty;
@ -170,6 +170,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
Class<?> iface = entity.getMappingInterface();
if (resultType == iface) {
if (values.size() > 0) {
boolean immutable = iface.isAssignableFrom(Drafted.class);
Collection<HelenusProperty> properties = entity.getOrderedProperties();
Map<String, Object> backingMap = new HashMap<String, Object>(properties.size());
@ -190,7 +191,8 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
} else {
// If we started this operation with an instance of this type, use values from that.
if (pojo != null) {
backingMap.put(key, BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop));
backingMap.put(
key, BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, immutable));
} else {
// Otherwise we'll use default values for the property type if available.
Class<?> propType = prop.getJavaType();
@ -248,7 +250,8 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
switch (prop.getProperty().getColumnType()) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
keys.add(entity.getName().toCql() + '.' + prop.getColumnName() + "==" + t._2.toString());
keys.add(
entity.getName().toCql() + '.' + prop.getColumnName() + "==" + t._2.toString());
break;
default:
break;
@ -265,7 +268,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
T result = super.sync(uow);
Class<?> iface = entity.getMappingInterface();
if (resultType == iface) {
updateCache(uow, result, getQueryKeys());
updateCache(uow, result, entity.getIdentifyingFacets(), getQueryKeys());
}
return result;
}

View file

@ -1,3 +1,18 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.operation;
import brave.Span;
@ -9,15 +24,14 @@ import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.BoundFacet;
import net.helenus.core.cache.EntityIdentifyingFacet;
import net.helenus.support.HelenusException;
public abstract class Operation<E> {
@ -41,7 +55,8 @@ public abstract class Operation<E> {
long timeout,
TimeUnit units,
boolean showValues,
boolean cached) throws TimeoutException {
boolean cached)
throws TimeoutException {
// Start recording in a Zipkin sub-span our execution time to perform this operation.
Tracer tracer = session.getZipkinTracer();
@ -52,14 +67,14 @@ public abstract class Operation<E> {
try {
if (span != null) {
span.name("cassandra");
span.start();
}
if (span != null) {
span.name("cassandra");
span.start();
}
Statement statement = options(buildStatement(cached));
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return futureResultSet.getUninterruptibly(timeout, units);
Statement statement = options(buildStatement(cached));
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return futureResultSet.getUninterruptibly(timeout, units);
} finally {
if (span != null) {
@ -77,11 +92,14 @@ public abstract class Operation<E> {
}
public String[] getQueryKeys() {
return null;
}
public Set<EntityIdentifyingFacet> getFacets() {
return null;
}
public Map<String, EntityIdentifyingFacet> getIdentifyingFacets() {
return null;
}
public Set<BoundFacet> bindFacetValues() {
return null;
}
}

View file

@ -17,11 +17,12 @@ package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import net.helenus.core.cache.BoundFacet;
import net.helenus.core.cache.EntityIdentifyingFacet;
public final class SelectFirstOperation<E>
extends AbstractFilterOptionalOperation<E, SelectFirstOperation<E>> {
@ -40,18 +41,25 @@ public final class SelectFirstOperation<E>
return new SelectFirstTransformingOperation<R, E>(delegate, fn);
}
@Override
public String[] getQueryKeys() {
return delegate.getQueryKeys();
}
@Override
public BuiltStatement buildStatement(boolean cached) {
return delegate.buildStatement(cached);
}
@Override
public Set<EntityIdentifyingFacet> getFacets() { return delegate.getFacets(); }
public String[] getQueryKeys() {
return delegate.getQueryKeys();
}
@Override
public Map<String, EntityIdentifyingFacet> getIdentifyingFacets() {
return delegate.getIdentifyingFacets();
}
@Override
public Set<BoundFacet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override
public Optional<E> transform(ResultSet resultSet) {

View file

@ -17,11 +17,10 @@ package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import net.helenus.core.cache.BoundFacet;
public final class SelectFirstTransformingOperation<R, E>
extends AbstractFilterOptionalOperation<R, SelectFirstTransformingOperation<R, E>> {
@ -44,7 +43,9 @@ public final class SelectFirstTransformingOperation<R, E>
}
@Override
public Set<EntityIdentifyingFacet> getFacets() { return delegate.getFacets(); }
public Set<BoundFacet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override
public BuiltStatement buildStatement(boolean cached) {

View file

@ -23,18 +23,16 @@ import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.core.querybuilder.Select.Selection;
import com.datastax.driver.core.querybuilder.Select.Where;
import com.google.common.collect.Iterables;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import com.google.common.collect.Iterables;
import net.helenus.core.*;
import net.helenus.core.cache.BoundFacet;
import net.helenus.core.cache.EntityIdentifyingFacet;
import net.helenus.core.reflect.HelenusPropertyNode;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusProperty;
import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.OrderingDirection;
import net.helenus.mapping.value.ColumnValueProvider;
@ -197,27 +195,48 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
}
@Override
public Set<EntityIdentifyingFacet> getFacets() {
public String[] getQueryKeys() {
int i = 0;
String[] keys = new String[filters.size()];
HelenusEntity entity = props.get(0).getEntity();
final Set<EntityIdentifyingFacet> facets = new HashSet<>(filters.size());
String entityName = entity.getName().toCql();
for (Filter<?> filter : filters.values()) {
keys[i++] = entityName + '.' + filter.toString();
}
return keys;
}
@Override
public Map<String, EntityIdentifyingFacet> getIdentifyingFacets() {
HelenusEntity entity = props.get(0).getEntity();
return entity.getIdentifyingFacets();
}
@Override
public Set<BoundFacet> bindFacetValues() {
HelenusEntity entity = props.get(0).getEntity();
Set<BoundFacet> boundFacets = new HashSet<BoundFacet>();
// Check to see if this select statement has enough information to build one or
// more identifying facets.
entity
.getIdentifyingFacets()
.forEach(
(facetName, facet) -> {
if (facet.isFullyBound()) {
facets.add(facet);
} else {
HelenusProperty prop = facet.getProperty();
Filter filter = filters.get(prop);
if (filter != null) {
facet.setValueForProperty(prop, filter.toString());
facets.add(facet);
}
EntityIdentifyingFacet.Binder binder = facet.binder();
facet
.getProperties()
.forEach(
prop -> {
Filter filter = filters.get(prop);
if (filter != null) {
binder.setValueForProperty(prop, filter.toString());
}
});
if (binder.isFullyBound()) {
boundFacets.add(binder.bind());
}
});
return facets;
return boundFacets;
}
@Override
@ -262,7 +281,6 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
break;
}
}
}
if (entity == null) {

View file

@ -17,11 +17,12 @@ package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import net.helenus.core.cache.EntityIdentifyingFacet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
import net.helenus.core.cache.BoundFacet;
import net.helenus.core.cache.EntityIdentifyingFacet;
public final class SelectTransformingOperation<R, E>
extends AbstractFilterStreamOperation<R, SelectTransformingOperation<R, E>> {
@ -44,7 +45,14 @@ public final class SelectTransformingOperation<R, E>
}
@Override
public Set<EntityIdentifyingFacet> getFacets() { return delegate.getFacets(); }
public Set<BoundFacet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override
public Map<String, EntityIdentifyingFacet> getIdentifyingFacets() {
return delegate.getIdentifyingFacets();
}
@Override
public BuiltStatement buildStatement(boolean cached) {

View file

@ -586,7 +586,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
E result = super.sync(uow);
// TODO(gburd): Only drafted entity objects are updated in the cache at this time.
if (draft != null) {
updateCache(uow, result, getQueryKeys());
updateCache(uow, result, getIdentifyingFacets(), getQueryKeys());
}
return result;
}

View file

@ -1,3 +1,18 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.reflect;
import java.util.Set;

View file

@ -20,13 +20,13 @@ import net.helenus.mapping.HelenusEntity;
public interface DslExportable {
public static final String GET_ENTITY_METHOD = "getHelenusMappingEntity";
public static final String GET_PARENT_METHOD = "getParentDslHelenusPropertyNode";
public static final String SET_METADATA_METHOD = "setCassandraMetadataForHelenusSesion";
String GET_ENTITY_METHOD = "getHelenusMappingEntity";
String GET_PARENT_METHOD = "getParentDslHelenusPropertyNode";
String SET_METADATA_METHOD = "setCassandraMetadataForHelenusSession";
HelenusEntity getHelenusMappingEntity();
HelenusPropertyNode getParentDslHelenusPropertyNode();
void setCassandraMetadataForHelenusSesion(Metadata metadata);
void setCassandraMetadataForHelenusSession(Metadata metadata);
}

View file

@ -59,7 +59,7 @@ public class DslInvocationHandler<E> implements InvocationHandler {
this.classLoader = classLoader;
}
public void setCassandraMetadataForHelenusSesion(Metadata metadata) {
public void setCassandraMetadataForHelenusSession(Metadata metadata) {
if (metadata != null) {
this.metadata = metadata;
entity = init(metadata);
@ -130,7 +130,7 @@ public class DslInvocationHandler<E> implements InvocationHandler {
&& args.length == 1
&& args[0] instanceof Metadata) {
if (metadata == null) {
this.setCassandraMetadataForHelenusSesion((Metadata) args[0]);
this.setCassandraMetadataForHelenusSession((Metadata) args[0]);
}
return null;
}

View file

@ -75,7 +75,12 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
return false;
}
if (Proxy.isProxyClass(otherObj.getClass())) {
return this == Proxy.getInvocationHandler(otherObj);
if (this == Proxy.getInvocationHandler(otherObj)) {
return true;
}
}
if (otherObj instanceof MapExportable && src.equals(((MapExportable) otherObj).toMap())) {
return true;
}
return false;
}

View file

@ -123,16 +123,15 @@ public final class HelenusMappingEntity implements HelenusEntity {
default:
if (primaryProperties != null) {
primaryFacet =
new EntityIdentifyingFacet(
primaryProperties.toArray(new HelenusProperty[props.size()]));
new EntityIdentifyingFacet(new HashSet<HelenusProperty>(primaryProperties));
allFacetsBuilder.put("*", primaryFacet);
primaryProperties = null;
}
Optional<IdentityName> optionalIndexName = prop.getIndexName();
if (optionalIndexName.isPresent()) {
EntityIdentifyingFacet facet =
new EntityIdentifyingFacet(prop);
EntityIdentifyingFacet facet = new EntityIdentifyingFacet(prop);
ancillaryFacetsBuilder.put(prop.getPropertyName(), facet);
allFacetsBuilder.put(prop.getPropertyName(), facet);
}
}
}

View file

@ -25,7 +25,8 @@ public enum BeanColumnValueProvider implements ColumnValueProvider {
INSTANCE;
@Override
public <V> V getColumnValue(Object bean, int columnIndexUnused, HelenusProperty property) {
public <V> V getColumnValue(
Object bean, int columnIndexUnused, HelenusProperty property, boolean immutable) {
Method getter = property.getGetterMethod();

View file

@ -22,7 +22,11 @@ import net.helenus.mapping.HelenusProperty;
public interface ColumnValueProvider {
<V> V getColumnValue(Object source, int columnIndex, HelenusProperty property);
<V> V getColumnValue(Object source, int columnIndex, HelenusProperty property, boolean immutable);
default <V> V getColumnValue(Object source, int columnIndex, HelenusProperty property) {
return getColumnValue(source, columnIndex, property, false);
}
default <T> TypeCodec<T> codecFor(DataType type) {
return CodecRegistry.DEFAULT_INSTANCE.codecFor(type);

View file

@ -16,9 +16,14 @@
package net.helenus.mapping.value;
import com.datastax.driver.core.*;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import net.helenus.core.SessionRepository;
import net.helenus.mapping.HelenusProperty;
@ -32,15 +37,16 @@ public final class RowColumnValueProvider implements ColumnValueProvider {
}
@Override
public <V> V getColumnValue(Object sourceObj, int columnIndex, HelenusProperty property) {
public <V> V getColumnValue(
Object sourceObj, int columnIndex, HelenusProperty property, boolean immutable) {
Row source = (Row) sourceObj;
Object value = null;
if (columnIndex != -1) {
value = readValueByIndex(source, columnIndex);
value = readValueByIndex(source, columnIndex, immutable);
} else {
value = readValueByName(source, property.getColumnName().getName());
value = readValueByName(source, property.getColumnName().getName(), immutable);
}
if (value != null) {
@ -55,7 +61,7 @@ public final class RowColumnValueProvider implements ColumnValueProvider {
return (V) value;
}
private Object readValueByIndex(Row source, int columnIndex) {
private Object readValueByIndex(Row source, int columnIndex, boolean immutable) {
if (source.isNull(columnIndex)) {
return null;
@ -71,14 +77,18 @@ public final class RowColumnValueProvider implements ColumnValueProvider {
switch (columnType.getName()) {
case SET:
return source.getSet(columnIndex, codecFor(typeArguments.get(0)).getJavaType());
Set set = source.getSet(columnIndex, codecFor(typeArguments.get(0)).getJavaType());
return immutable ? ImmutableSet.copyOf(set) : set;
case MAP:
return source.getMap(
columnIndex,
codecFor(typeArguments.get(0)).getJavaType(),
codecFor(typeArguments.get(1)).getJavaType());
Map map =
source.getMap(
columnIndex,
codecFor(typeArguments.get(0)).getJavaType(),
codecFor(typeArguments.get(1)).getJavaType());
return immutable ? ImmutableMap.copyOf(map) : map;
case LIST:
return source.getList(columnIndex, codecFor(typeArguments.get(0)).getJavaType());
List list = source.getList(columnIndex, codecFor(typeArguments.get(0)).getJavaType());
return immutable ? ImmutableList.copyOf(list) : list;
}
}
@ -88,7 +98,7 @@ public final class RowColumnValueProvider implements ColumnValueProvider {
return value;
}
private Object readValueByName(Row source, String columnName) {
private Object readValueByName(Row source, String columnName, boolean immutable) {
if (source.isNull(columnName)) {
return null;
@ -104,14 +114,18 @@ public final class RowColumnValueProvider implements ColumnValueProvider {
switch (columnType.getName()) {
case SET:
return source.getSet(columnName, codecFor(typeArguments.get(0)).getJavaType());
Set set = source.getSet(columnName, codecFor(typeArguments.get(0)).getJavaType());
return immutable ? ImmutableSet.copyOf(set) : set;
case MAP:
return source.getMap(
columnName,
codecFor(typeArguments.get(0)).getJavaType(),
codecFor(typeArguments.get(1)).getJavaType());
Map map =
source.getMap(
columnName,
codecFor(typeArguments.get(0)).getJavaType(),
codecFor(typeArguments.get(1)).getJavaType());
return immutable ? ImmutableMap.copyOf(map) : map;
case LIST:
return source.getList(columnName, codecFor(typeArguments.get(0)).getJavaType());
List list = source.getList(columnName, codecFor(typeArguments.get(0)).getJavaType());
return immutable ? ImmutableList.copyOf(list) : list;
}
}

View file

@ -34,7 +34,8 @@ public final class TupleColumnValueProvider implements ColumnValueProvider {
}
@Override
public <V> V getColumnValue(Object sourceObj, int columnIndexUnused, HelenusProperty property) {
public <V> V getColumnValue(
Object sourceObj, int columnIndexUnused, HelenusProperty property, boolean immutable) {
int columnIndex = property.getOrdinal();

View file

@ -35,7 +35,8 @@ public final class UDTColumnValueProvider implements ColumnValueProvider {
@Override
@SuppressWarnings("unchecked")
public <V> V getColumnValue(Object sourceObj, int columnIndexUnused, HelenusProperty property) {
public <V> V getColumnValue(
Object sourceObj, int columnIndexUnused, HelenusProperty property, boolean immutable) {
UDTValue source = (UDTValue) sourceObj;

View file

@ -19,6 +19,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import net.helenus.core.reflect.Drafted;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusProperty;
import net.helenus.support.HelenusMappingException;
@ -28,11 +29,13 @@ public final class ValueProviderMap implements Map<String, Object> {
private final Object source;
private final ColumnValueProvider valueProvider;
private final HelenusEntity entity;
private final boolean immutable;
public ValueProviderMap(Object source, ColumnValueProvider valueProvider, HelenusEntity entity) {
this.source = source;
this.valueProvider = valueProvider;
this.entity = entity;
this.immutable = entity.getMappingInterface().isAssignableFrom(Drafted.class);
}
@Override
@ -41,7 +44,7 @@ public final class ValueProviderMap implements Map<String, Object> {
String name = (String) key;
HelenusProperty prop = entity.getProperty(name);
if (prop != null) {
return valueProvider.getColumnValue(source, -1, prop);
return valueProvider.getColumnValue(source, -1, prop, immutable);
}
}
return null;
@ -126,4 +129,17 @@ public final class ValueProviderMap implements Map<String, Object> {
public String toString() {
return source.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || (!o.getClass().isAssignableFrom(Map.class) && getClass() != o.getClass()))
return false;
Map that = (Map) o;
if (this.size() != that.size()) return false;
for (String key : this.keySet()) if (!this.get(key).equals(that.get(key))) return false;
return true;
}
}

View file

@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;

View file

@ -17,6 +17,7 @@ package net.helenus.test.integration.core.counter;
import static net.helenus.core.Query.eq;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
@ -24,8 +25,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.concurrent.TimeoutException;
public class CounterTest extends AbstractEmbeddedCassandraTest {
static Page page;

View file

@ -5,7 +5,6 @@ import static net.helenus.core.Query.eq;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;

View file

@ -15,6 +15,7 @@
*/
package net.helenus.test.integration.core.index;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.Query;
@ -23,8 +24,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.TimeoutException;
public class SecondaryIndexTest extends AbstractEmbeddedCassandraTest {
Book book;

View file

@ -20,7 +20,6 @@ import static net.helenus.core.Query.eq;
import com.datastax.driver.core.ResultSet;
import java.util.*;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.Operator;

View file

@ -15,6 +15,7 @@
*/
package net.helenus.test.integration.core.tuple;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.Query;
@ -23,8 +24,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.concurrent.TimeoutException;
public class InnerTupleTest extends AbstractEmbeddedCassandraTest {
static PhotoAlbum photoAlbum;

View file

@ -20,6 +20,7 @@ import static net.helenus.core.Query.eq;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.TupleType;
import com.datastax.driver.core.TupleValue;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
@ -27,8 +28,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.concurrent.TimeoutException;
public class TupleTest extends AbstractEmbeddedCassandraTest {
static Album album;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.tuplecollection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.tuplecollection;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.tuplecollection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.tuplecollection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.tuplecollection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -21,7 +21,6 @@ import static net.helenus.core.Query.get;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.udtcollection;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.udtcollection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -18,7 +18,6 @@ package net.helenus.test.integration.core.udtcollection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Query;
import org.junit.Assert;
import org.junit.Test;

View file

@ -21,7 +21,6 @@ import static net.helenus.core.Query.get;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Test;

View file

@ -25,6 +25,7 @@ import net.helenus.core.HelenusSession;
import net.helenus.core.UnitOfWork;
import net.helenus.core.annotation.Cacheable;
import net.helenus.mapping.annotation.Column;
import net.helenus.mapping.annotation.Index;
import net.helenus.mapping.annotation.PartitionKey;
import net.helenus.mapping.annotation.Table;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
@ -39,6 +40,7 @@ interface Widget {
UUID id();
@Column
@Index
String name();
}
@ -141,6 +143,41 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
});
}
}
@Test
public void testSelectViaIndexAfterSelect() throws Exception {
Widget w1, w2;
UUID key = UUIDs.timeBased();
try (UnitOfWork uow = session.begin()) {
// This should inserted Widget, but not cache it.
session
.<Widget>insert(widget)
.value(widget::id, key)
.value(widget::name, RandomString.make(20))
.sync();
// This should read from the database and return a Widget.
w1 =
session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null);
// This should read from the cache and get the same instance of a Widget.
w2 =
session
.<Widget>select(widget)
.where(widget::name, eq(w1.name()))
.single()
.sync(uow)
.orElse(null);
uow.commit()
.andThen(
() -> {
Assert.assertEquals(w1, w2);
});
}
}
/*
@Test
public void testSelectAfterInsertProperlyCachesEntity() throws Exception {

View file

@ -19,7 +19,6 @@ import com.google.common.collect.Sets;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.Query;

View file

@ -19,7 +19,6 @@ import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.Query;

View file

@ -21,7 +21,6 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
@ -60,15 +59,15 @@ public class MaterializedViewTest extends AbstractEmbeddedCassandraTest {
try {
session
.insert(cyclist)
.value(cyclist::cid, UUID.randomUUID())
.value(cyclist::age, 18)
.value(cyclist::birthday, dateFromString("1997-02-08"))
.value(cyclist::country, "Netherlands")
.value(cyclist::name, "Pascal EENKHOORN")
.sync();
.insert(cyclist)
.value(cyclist::cid, UUID.randomUUID())
.value(cyclist::age, 18)
.value(cyclist::birthday, dateFromString("1997-02-08"))
.value(cyclist::country, "Netherlands")
.value(cyclist::name, "Pascal EENKHOORN")
.sync();
} catch (TimeoutException e) {
}
catch (TimeoutException e) {}
}
@Test