Facet cache is working, able to fetch by non-primary key within UOW.
This commit is contained in:
parent
6e687a7e90
commit
1746691826
23 changed files with 248 additions and 314 deletions
|
@ -35,7 +35,6 @@
|
|||
<orderEntry type="library" name="Maven: org.springframework:spring-core:4.3.10.RELEASE" level="project" />
|
||||
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
|
||||
<orderEntry type="library" name="Maven: com.google.guava:guava:20.0" level="project" />
|
||||
<orderEntry type="library" name="Maven: org.ahocorasick:ahocorasick:0.4.0" level="project" />
|
||||
<orderEntry type="library" name="Maven: io.zipkin.java:zipkin:1.29.2" level="project" />
|
||||
<orderEntry type="library" name="Maven: io.zipkin.brave:brave:4.0.6" level="project" />
|
||||
<orderEntry type="library" name="Maven: io.zipkin.reporter:zipkin-reporter:0.6.12" level="project" />
|
||||
|
|
6
pom.xml
6
pom.xml
|
@ -148,12 +148,6 @@
|
|||
<version>20.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.ahocorasick</groupId>
|
||||
<artifactId>ahocorasick</artifactId>
|
||||
<version>0.4.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Metrics and tracing -->
|
||||
<dependency>
|
||||
<groupId>io.zipkin.java</groupId>
|
||||
|
|
|
@ -16,17 +16,13 @@
|
|||
package net.helenus.core;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.ahocorasick.trie.Emit;
|
||||
import org.ahocorasick.trie.Trie;
|
||||
|
||||
import com.diffplug.common.base.Errors;
|
||||
import com.google.common.collect.HashBasedTable;
|
||||
import com.google.common.collect.Table;
|
||||
import com.google.common.collect.TreeTraverser;
|
||||
|
||||
import net.helenus.core.cache.BoundFacet;
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.support.Either;
|
||||
|
||||
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
|
||||
public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfWork<E>, AutoCloseable {
|
||||
|
@ -34,11 +30,12 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
private final HelenusSession session;
|
||||
private final AbstractUnitOfWork<E> parent;
|
||||
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
|
||||
private final Map<String, Either<Object, Set<Object>>> cache = new HashMap<String, Either<Object, Set<Object>>>();
|
||||
private Trie cacheIndex = Trie.builder().ignoreOverlaps().build();
|
||||
private boolean aborted = false;
|
||||
private boolean committed = false;
|
||||
|
||||
// Cache:
|
||||
private final Table<String, String, Object> cache = HashBasedTable.create();
|
||||
|
||||
protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork<E> parent) {
|
||||
Objects.requireNonNull(session, "containing session cannot be null");
|
||||
|
||||
|
@ -68,62 +65,40 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
}
|
||||
|
||||
@Override
|
||||
public Optional<Either<Object, Set<Object>>> cacheLookupByFacet(Set<Facet> facets) {
|
||||
Optional<Either<Object, Set<Object>>> result = Optional.empty();
|
||||
Collection<Emit> emits = cacheIndex.parseText(
|
||||
String.join(" ", facets.stream().map(facet -> facet.toString()).collect(Collectors.toList())));
|
||||
for (Emit emit : emits) {
|
||||
// NOTE: rethink. should this match *all* facets? how do I know which emit
|
||||
// keyword is the primary key?
|
||||
String key = emit.getKeyword();
|
||||
result = cacheLookup(key);
|
||||
if (result.isPresent()) {
|
||||
return result;
|
||||
public Optional<Object> cacheLookup(List<Facet> facets) {
|
||||
Facet table = facets.remove(0);
|
||||
String tableName = table.value().toString();
|
||||
Optional<Object> result = Optional.empty();
|
||||
for (Facet facet : facets) {
|
||||
String columnName = facet.name() + "==" + facet.value();
|
||||
Object value = cache.get(tableName, columnName);
|
||||
if (value != null) {
|
||||
if (result.isPresent() && result.get() != value) {
|
||||
// One facet matched, but another did not.
|
||||
result = Optional.empty();
|
||||
break;
|
||||
} else {
|
||||
result = Optional.of(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!result.isPresent()) {
|
||||
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
|
||||
if (parent != null) {
|
||||
return parent.cacheLookupByFacet(facets);
|
||||
return parent.cacheLookup(facets);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Either<Object, Set<Object>>> cacheLookupByStatement(String[] statementKeys) {
|
||||
String key = String.join(",", statementKeys);
|
||||
return cacheLookup(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Either<Object, Set<Object>>> cacheLookup(String key) {
|
||||
Optional<Either<Object, Set<Object>>> result = (cache.containsKey(key))
|
||||
? Optional.of(cache.get(key))
|
||||
: Optional.empty();
|
||||
|
||||
if (!result.isPresent()) {
|
||||
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
|
||||
if (parent != null) {
|
||||
return parent.cacheLookup(key);
|
||||
}
|
||||
public void cacheUpdate(Object value, List<Facet> facets) {
|
||||
Facet table = facets.remove(0);
|
||||
String tableName = table.value().toString();
|
||||
for (Facet facet : facets) {
|
||||
String columnName = facet.name() + "==" + facet.value();
|
||||
cache.put(tableName, columnName, value);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cacheUpdate(Either<Object, Set<Object>> value, String[] statementKeys,
|
||||
Map<String, BoundFacet> facetMap) {
|
||||
String key = "CQL::" + String.join(",", statementKeys);
|
||||
cache.put(key, value);
|
||||
Trie.TrieBuilder builder = cacheIndex.builder().ignoreOverlaps();
|
||||
facetMap.forEach((facetName, facet) -> {
|
||||
builder.addKeyword(facet.toString());
|
||||
if (facetName.equals("*")) {
|
||||
cache.put(facet.toString(), value);
|
||||
}
|
||||
});
|
||||
cacheIndex = builder.build();
|
||||
}
|
||||
|
||||
private Iterator<AbstractUnitOfWork<E>> getChildNodes() {
|
||||
|
@ -161,15 +136,14 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
committed = true;
|
||||
aborted = false;
|
||||
|
||||
// TODO(gburd): union this cache with parent's (if there is a parent) or with
|
||||
// the session cache for all cacheable entities we currently hold
|
||||
|
||||
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
|
||||
|
||||
// Merge UOW cache into parent's cache.
|
||||
if (parent != null) {
|
||||
parent.assumeCache(cache, cacheIndex);
|
||||
}
|
||||
parent.mergeCache(cache);
|
||||
} // else {
|
||||
// TODO... merge into session cache objects marked cacheable
|
||||
// }
|
||||
|
||||
// Apply all post-commit functions for
|
||||
if (parent == null) {
|
||||
|
@ -197,35 +171,21 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
// cache.invalidateSince(txn::start time)
|
||||
}
|
||||
|
||||
private void assumeCache(Map<String, Either<Object, Set<Object>>> childCache, Trie childCacheIndex) {
|
||||
for (String key : childCache.keySet()) {
|
||||
if (cache.containsKey(key)) {
|
||||
Either<Object, Set<Object>> value = cache.get(key);
|
||||
if (value.isLeft()) {
|
||||
Object obj = value.getLeft();
|
||||
// merge objects
|
||||
Either<Object, Set<Object>> childValue = childCache.get(key);
|
||||
if (childValue.isLeft()) {
|
||||
Object childObj = childValue.getLeft();
|
||||
} else {
|
||||
Set<Object> childSet = childValue.getRight();
|
||||
}
|
||||
private void mergeCache(Table<String, String, Object> from) {
|
||||
Table<String, String, Object> to = this.cache;
|
||||
from.rowMap().forEach((rowKey, columnMap) -> {
|
||||
columnMap.forEach((columnKey, value) -> {
|
||||
if (to.contains(rowKey, columnKey)) {
|
||||
to.put(rowKey, columnKey, merge(to.get(rowKey, columnKey), from.get(rowKey, columnKey)));
|
||||
} else {
|
||||
// merge the sets
|
||||
Set<Object> set = value.getRight();
|
||||
Either<Object, Set<Object>> childValue = childCache.get(key);
|
||||
if (childValue.isLeft()) {
|
||||
Object childObj = childValue.getLeft();
|
||||
set.add(childObj);
|
||||
} else {
|
||||
Set<Object> childSet = childValue.getRight();
|
||||
set.addAll(childSet);
|
||||
}
|
||||
to.put(rowKey, columnKey, from.get(rowKey, columnKey));
|
||||
}
|
||||
} else {
|
||||
cache.put(key, childCache.get(key));
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private Object merge(Object to, Object from) {
|
||||
return to; // TODO(gburd): yeah...
|
||||
}
|
||||
|
||||
public String describeConflicts() {
|
||||
|
|
|
@ -105,6 +105,10 @@ public final class Filter<V> {
|
|||
return new Filter<V>(node, postulate);
|
||||
}
|
||||
|
||||
public V[] postulateValues() {
|
||||
return postulate.values();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return node.getColumnName() + postulate.toString();
|
||||
|
|
|
@ -71,6 +71,10 @@ public final class Postulate<V> {
|
|||
}
|
||||
}
|
||||
|
||||
public V[] values() {
|
||||
return values;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
|
||||
|
|
|
@ -140,6 +140,28 @@ public final class SchemaUtil {
|
|||
return SchemaBuilder.dropType(type.getTypeName()).ifExists();
|
||||
}
|
||||
|
||||
public static String createPrimaryKeyPhrase(Collection<HelenusProperty> properties) {
|
||||
List<String> p = new ArrayList<String>(properties.size());
|
||||
List<String> c = new ArrayList<String>(properties.size());
|
||||
|
||||
for (HelenusProperty prop : properties) {
|
||||
String columnName = prop.getColumnName().toCql();
|
||||
switch (prop.getColumnType()) {
|
||||
case PARTITION_KEY :
|
||||
p.add(columnName);
|
||||
break;
|
||||
case CLUSTERING_COLUMN :
|
||||
c.add(columnName);
|
||||
break;
|
||||
default :
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return "(" + ((p.size() > 1) ? "(" + String.join(", ", p) + ")" : p.get(0))
|
||||
+ ((c.size() > 0) ? ", " + ((c.size() > 1) ? "(" + String.join(", ", c) + ")" : c.get(0)) : "") + ")";
|
||||
}
|
||||
|
||||
public static SchemaStatement createMaterializedView(String keyspace, String viewName, HelenusEntity entity) {
|
||||
if (entity.getType() != HelenusEntityType.VIEW) {
|
||||
throw new HelenusMappingException("expected view entity " + entity);
|
||||
|
@ -162,20 +184,16 @@ public final class SchemaUtil {
|
|||
Class<?> iface = entity.getMappingInterface();
|
||||
String tableName = Helenus.entity(iface.getInterfaces()[0]).getName().toCql();
|
||||
Select.Where where = selection.from(tableName).where();
|
||||
List<String> p = new ArrayList<String>(props.size());
|
||||
List<String> c = new ArrayList<String>(props.size());
|
||||
List<String> o = new ArrayList<String>(props.size());
|
||||
|
||||
for (HelenusPropertyNode prop : props) {
|
||||
String columnName = prop.getColumnName();
|
||||
switch (prop.getProperty().getColumnType()) {
|
||||
case PARTITION_KEY :
|
||||
p.add(columnName);
|
||||
where = where.and(new IsNotNullClause(columnName));
|
||||
break;
|
||||
|
||||
case CLUSTERING_COLUMN :
|
||||
c.add(columnName);
|
||||
where = where.and(new IsNotNullClause(columnName));
|
||||
|
||||
ClusteringColumn clusteringColumn = prop.getProperty().getGetterMethod()
|
||||
|
@ -189,8 +207,7 @@ public final class SchemaUtil {
|
|||
}
|
||||
}
|
||||
|
||||
String primaryKey = "PRIMARY KEY (" + ((p.size() > 1) ? "(" + String.join(", ", p) + ")" : p.get(0))
|
||||
+ ((c.size() > 0) ? ", " + ((c.size() > 1) ? "(" + String.join(", ", c) + ")" : c.get(0)) : "") + ")";
|
||||
String primaryKey = "PRIMARY KEY " + createPrimaryKeyPhrase(entity.getOrderedProperties());
|
||||
|
||||
String clustering = "";
|
||||
if (o.size() > 0) {
|
||||
|
|
|
@ -15,13 +15,10 @@
|
|||
*/
|
||||
package net.helenus.core;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import net.helenus.core.cache.BoundFacet;
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.support.Either;
|
||||
|
||||
public interface UnitOfWork<X extends Exception> extends AutoCloseable {
|
||||
|
||||
|
@ -56,11 +53,7 @@ public interface UnitOfWork<X extends Exception> extends AutoCloseable {
|
|||
|
||||
boolean hasCommitted();
|
||||
|
||||
Optional<Either<Object, Set<Object>>> cacheLookup(String key);
|
||||
Optional<Object> cacheLookup(List<Facet> facets);
|
||||
|
||||
Optional<Either<Object, Set<Object>>> cacheLookupByFacet(Set<Facet> facets);
|
||||
|
||||
Optional<Either<Object, Set<Object>>> cacheLookupByStatement(String[] statementKeys);
|
||||
|
||||
void cacheUpdate(Either<Object, Set<Object>> pojo, String[] statementKeys, Map<String, BoundFacet> facets);
|
||||
void cacheUpdate(Object pojo, List<Facet> facets);
|
||||
}
|
||||
|
|
|
@ -20,15 +20,19 @@ import java.util.stream.Collectors;
|
|||
|
||||
import net.helenus.mapping.HelenusProperty;
|
||||
|
||||
public class BoundFacet extends Facet {
|
||||
public class BoundFacet extends Facet<String> {
|
||||
private final Map<HelenusProperty, Object> properties;
|
||||
|
||||
BoundFacet(Map<HelenusProperty, Object> properties) {
|
||||
BoundFacet(String name, Map<HelenusProperty, Object> properties) {
|
||||
super(name,
|
||||
(properties.keySet().size() > 1)
|
||||
? "[" + String.join(", ",
|
||||
properties.keySet().stream().map(key -> properties.get(key).toString())
|
||||
.collect(Collectors.toSet()))
|
||||
+ "]"
|
||||
: String.join("", properties.keySet().stream().map(key -> properties.get(key).toString())
|
||||
.collect(Collectors.toSet())));
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return String.join(";",
|
||||
properties.keySet().stream().map(key -> properties.get(key).toString()).collect(Collectors.toSet()));
|
||||
}
|
||||
}
|
||||
|
|
55
src/main/java/net/helenus/core/cache/Facet.java
vendored
55
src/main/java/net/helenus/core/cache/Facet.java
vendored
|
@ -1,23 +1,56 @@
|
|||
/*
|
||||
* Copyright (C) 2015 The Helenus Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package net.helenus.core.cache;
|
||||
|
||||
public class Facet {
|
||||
}
|
||||
/*
|
||||
*
|
||||
/**
|
||||
* An Entity is identifiable via one or more Facets
|
||||
*
|
||||
*
|
||||
* A Facet is is a set of Properties and bound Facets
|
||||
*
|
||||
*
|
||||
* An Entity will have it's Keyspace, Table and Schema Version Facets bound.
|
||||
*
|
||||
*
|
||||
* A property may also have a TTL or write time bound.
|
||||
*
|
||||
*
|
||||
* The cache contains key->value mappings of merkel-hash -> Entity or
|
||||
* Set<Entity> The only way a Set<Entity> is put into the cache is with a key =
|
||||
* hash([Entity's bound Facets, hash(filter clause from SELECT)])
|
||||
*
|
||||
*
|
||||
* REMEMBER to update the cache on build() for all impacted facets, delete
|
||||
* existing keys and add new keys
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class Facet<T> {
|
||||
private final String name;
|
||||
private T value;
|
||||
|
||||
public Facet(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public Facet(String name, T value) {
|
||||
this.name = name;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public String name() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public T value() {
|
||||
return value;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -15,44 +15,45 @@
|
|||
*/
|
||||
package net.helenus.core.cache;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import net.helenus.core.SchemaUtil;
|
||||
import net.helenus.mapping.HelenusProperty;
|
||||
|
||||
public class EntityIdentifyingFacet extends Facet {
|
||||
public class UnboundFacet extends Facet<String> {
|
||||
|
||||
private final Set<HelenusProperty> properties;
|
||||
private final List<HelenusProperty> properties;
|
||||
|
||||
public EntityIdentifyingFacet(HelenusProperty prop) {
|
||||
properties = new HashSet<HelenusProperty>();
|
||||
properties.add(prop);
|
||||
public UnboundFacet(List<HelenusProperty> properties) {
|
||||
super(SchemaUtil.createPrimaryKeyPhrase(properties));
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public EntityIdentifyingFacet(Set<HelenusProperty> props) {
|
||||
properties = props;
|
||||
public UnboundFacet(HelenusProperty property) {
|
||||
super(property.getPropertyName());
|
||||
properties = new ArrayList<HelenusProperty>();
|
||||
properties.add(property);
|
||||
}
|
||||
|
||||
public boolean isFullyBound() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public Set<HelenusProperty> getProperties() {
|
||||
public List<HelenusProperty> getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
public Binder binder() {
|
||||
return new Binder(properties);
|
||||
return new Binder(name(), properties);
|
||||
}
|
||||
|
||||
public static class Binder {
|
||||
|
||||
private final Set<HelenusProperty> properties = new HashSet<HelenusProperty>();
|
||||
private final String name;
|
||||
private final List<HelenusProperty> properties = new ArrayList<HelenusProperty>();
|
||||
private Map<HelenusProperty, Object> boundProperties = new HashMap<HelenusProperty, Object>();
|
||||
|
||||
Binder(Set<HelenusProperty> properties) {
|
||||
Binder(String name, List<HelenusProperty> properties) {
|
||||
this.name = name;
|
||||
this.properties.addAll(properties);
|
||||
}
|
||||
|
||||
|
@ -62,12 +63,12 @@ public class EntityIdentifyingFacet extends Facet {
|
|||
return this;
|
||||
}
|
||||
|
||||
public boolean isFullyBound() {
|
||||
public boolean isBound() {
|
||||
return properties.isEmpty();
|
||||
}
|
||||
|
||||
public BoundFacet bind() {
|
||||
return new BoundFacet(boundProperties);
|
||||
return new BoundFacet(name, boundProperties);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,8 +15,8 @@
|
|||
*/
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
@ -80,9 +80,8 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
String[] statementKeys = null;
|
||||
|
||||
if (enableCache) {
|
||||
Set<Facet> facets = bindFacetValues();
|
||||
statementKeys = getQueryKeys();
|
||||
cacheResult = checkCache(uow, facets, statementKeys);
|
||||
List<Facet> facets = bindFacetValues();
|
||||
cacheResult = checkCache(uow, facets);
|
||||
if (cacheResult != null) {
|
||||
result = Optional.of(cacheResult);
|
||||
}
|
||||
|
@ -101,7 +100,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
// need to put this result
|
||||
// into the cache for future requests to find.
|
||||
if (enableCache && cacheResult == null && result.isPresent()) {
|
||||
updateCache(uow, result.get(), getIdentifyingFacets(), statementKeys);
|
||||
updateCache(uow, result.get(), getFacets());
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
|
@ -15,10 +15,10 @@
|
|||
*/
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
@ -39,12 +39,10 @@ import brave.Tracer;
|
|||
import brave.propagation.TraceContext;
|
||||
import net.helenus.core.AbstractSessionOperations;
|
||||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.core.cache.BoundFacet;
|
||||
import net.helenus.core.cache.EntityIdentifyingFacet;
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.core.cache.UnboundFacet;
|
||||
import net.helenus.core.reflect.MapExportable;
|
||||
import net.helenus.mapping.value.BeanColumnValueProvider;
|
||||
import net.helenus.support.Either;
|
||||
import net.helenus.support.HelenusException;
|
||||
|
||||
public abstract class AbstractStatementOperation<E, O extends AbstractStatementOperation<E, O>> extends Operation<E> {
|
||||
|
@ -320,42 +318,16 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
|
|||
throw new HelenusException("only RegularStatements can be prepared");
|
||||
}
|
||||
|
||||
protected E checkCache(UnitOfWork<?> uow, Set<Facet> facets, String[] statementKeys) {
|
||||
protected E checkCache(UnitOfWork<?> uow, List<Facet> facets) {
|
||||
E result = null;
|
||||
Optional<Either<Object, Set<Object>>> optionalCachedResult = Optional.empty();
|
||||
Optional<Object> optionalCachedResult = Optional.empty();
|
||||
|
||||
if (!facets.isEmpty()) {
|
||||
// TODO(gburd): what about select ResultSet, Tuple... etc.?
|
||||
optionalCachedResult = uow.cacheLookupByFacet(facets);
|
||||
optionalCachedResult = uow.cacheLookup(facets);
|
||||
if (optionalCachedResult.isPresent()) {
|
||||
Either<Object, Set<Object>> eitherCachedResult = optionalCachedResult.get();
|
||||
if (eitherCachedResult.isLeft()) {
|
||||
uowCacheHits.mark();
|
||||
logger.info("UnitOfWork({}) cache hit using facets", uow.hashCode());
|
||||
result = (E) eitherCachedResult.getLeft();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (result == null && statementKeys != null) {
|
||||
// Then check to see if this query happens to uniquely identify a single object
|
||||
// in thecache.
|
||||
optionalCachedResult = uow.cacheLookupByStatement(statementKeys);
|
||||
if (optionalCachedResult.isPresent()) {
|
||||
Either<Object, Set<Object>> eitherCachedResult = optionalCachedResult.get();
|
||||
// Statements always store Set<E> as the value in the cache.
|
||||
if (eitherCachedResult.isRight()) {
|
||||
Set<Object> cachedResult = eitherCachedResult.getRight();
|
||||
if (cachedResult.size() == 1) {
|
||||
Optional<Object> maybeResult = cachedResult.stream().findFirst();
|
||||
if (maybeResult.isPresent()) {
|
||||
uowCacheHits.mark();
|
||||
logger.info("UnitOfWork({}) cache hit for stmt", uow.hashCode());
|
||||
} else {
|
||||
result = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
uowCacheHits.mark();
|
||||
logger.info("UnitOfWork({}) cache hit using facets", uow.hashCode());
|
||||
result = (E) optionalCachedResult.get();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -367,30 +339,29 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
|
|||
return result;
|
||||
}
|
||||
|
||||
protected void updateCache(UnitOfWork<?> uow, E pojo, Map<String, EntityIdentifyingFacet> facetMap,
|
||||
String[] statementKeys) {
|
||||
|
||||
// Insert this entity into the cache for each facet for this entity that we can
|
||||
// fully bind.
|
||||
Map<String, BoundFacet> boundFacets = new HashMap<String, BoundFacet>();
|
||||
protected void updateCache(UnitOfWork<?> uow, E pojo, List<Facet> identifyingFacets) {
|
||||
List<Facet> facets = new ArrayList<>();
|
||||
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
|
||||
facetMap.forEach((facetName, facet) -> {
|
||||
if (!facet.isFullyBound()) {
|
||||
EntityIdentifyingFacet.Binder binder = facet.binder();
|
||||
facet.getProperties().forEach(prop -> {
|
||||
|
||||
for (Facet facet : identifyingFacets) {
|
||||
if (facet instanceof UnboundFacet) {
|
||||
UnboundFacet unboundFacet = (UnboundFacet) facet;
|
||||
UnboundFacet.Binder binder = unboundFacet.binder();
|
||||
unboundFacet.getProperties().forEach(prop -> {
|
||||
if (valueMap == null) {
|
||||
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
|
||||
binder.setValueForProperty(prop, prop.getColumnName().toCql() + "==" + value.toString());
|
||||
binder.setValueForProperty(prop, value.toString());
|
||||
} else {
|
||||
binder.setValueForProperty(prop,
|
||||
prop.getColumnName().toCql() + "==" + valueMap.get(prop.getPropertyName()).toString());
|
||||
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
|
||||
}
|
||||
facets.add(binder.bind());
|
||||
});
|
||||
boundFacets.put(facetName, binder.bind());
|
||||
} else {
|
||||
facets.add(facet);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Cache the value (pojo), the statement key, and the fully bound facets.
|
||||
uow.cacheUpdate(Either.left(pojo), statementKeys, boundFacets);
|
||||
uow.cacheUpdate(pojo, facets);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
*/
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
@ -76,12 +76,10 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
try {
|
||||
Stream<E> result = null;
|
||||
E cachedResult = null;
|
||||
String[] statementKeys = null;
|
||||
|
||||
if (enableCache) {
|
||||
Set<Facet> facets = bindFacetValues();
|
||||
statementKeys = getQueryKeys();
|
||||
cachedResult = checkCache(uow, facets, statementKeys);
|
||||
List<Facet> facets = bindFacetValues();
|
||||
cachedResult = checkCache(uow, facets);
|
||||
if (cachedResult != null) {
|
||||
result = Stream.of(cachedResult);
|
||||
}
|
||||
|
@ -96,7 +94,7 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
// If we have a result and we're caching then we need to put it into the cache
|
||||
// for future requests to find.
|
||||
if (enableCache && cachedResult != null) {
|
||||
updateCache(uow, cachedResult, getIdentifyingFacets(), statementKeys);
|
||||
updateCache(uow, cachedResult, getFacets());
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
*/
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.List;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import com.datastax.driver.core.BoundStatement;
|
||||
|
@ -36,12 +36,7 @@ public final class BoundStreamOperation<E> extends AbstractStreamOperation<E, Bo
|
|||
}
|
||||
|
||||
@Override
|
||||
public String[] getQueryKeys() {
|
||||
return delegate.getQueryKeys();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Facet> bindFacetValues() {
|
||||
public List<Facet> bindFacetValues() {
|
||||
return delegate.bindFacetValues();
|
||||
}
|
||||
|
||||
|
|
|
@ -236,23 +236,6 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getQueryKeys() {
|
||||
List<String> keys = new ArrayList<>(values.size());
|
||||
values.forEach(t -> {
|
||||
HelenusPropertyNode prop = t._1;
|
||||
switch (prop.getProperty().getColumnType()) {
|
||||
case PARTITION_KEY :
|
||||
case CLUSTERING_COLUMN :
|
||||
keys.add(entity.getName().toCql() + '.' + prop.getColumnName() + "==" + t._2.toString());
|
||||
break;
|
||||
default :
|
||||
break;
|
||||
}
|
||||
});
|
||||
return keys.toArray(new String[keys.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T sync(UnitOfWork uow) throws TimeoutException {
|
||||
if (uow == null) {
|
||||
|
@ -261,7 +244,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
T result = super.sync(uow);
|
||||
Class<?> iface = entity.getMappingInterface();
|
||||
if (resultType == iface) {
|
||||
updateCache(uow, result, entity.getIdentifyingFacets(), getQueryKeys());
|
||||
updateCache(uow, result, entity.getFacets());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -15,8 +15,7 @@
|
|||
*/
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
|
@ -32,7 +31,6 @@ import brave.Tracer;
|
|||
import brave.propagation.TraceContext;
|
||||
import net.helenus.core.AbstractSessionOperations;
|
||||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.core.cache.EntityIdentifyingFacet;
|
||||
import net.helenus.core.cache.Facet;
|
||||
|
||||
public abstract class Operation<E> {
|
||||
|
@ -87,15 +85,12 @@ public abstract class Operation<E> {
|
|||
return null;
|
||||
}
|
||||
|
||||
public String[] getQueryKeys() {
|
||||
public List<Facet> getFacets() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Map<String, EntityIdentifyingFacet> getIdentifyingFacets() {
|
||||
public List<Facet> bindFacetValues() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Set<Facet> bindFacetValues() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,15 +15,13 @@
|
|||
*/
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.querybuilder.BuiltStatement;
|
||||
|
||||
import net.helenus.core.cache.EntityIdentifyingFacet;
|
||||
import net.helenus.core.cache.Facet;
|
||||
|
||||
public final class SelectFirstOperation<E> extends AbstractFilterOptionalOperation<E, SelectFirstOperation<E>> {
|
||||
|
@ -48,17 +46,12 @@ public final class SelectFirstOperation<E> extends AbstractFilterOptionalOperati
|
|||
}
|
||||
|
||||
@Override
|
||||
public String[] getQueryKeys() {
|
||||
return delegate.getQueryKeys();
|
||||
public List<Facet> getFacets() {
|
||||
return delegate.getFacets();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, EntityIdentifyingFacet> getIdentifyingFacets() {
|
||||
return delegate.getIdentifyingFacets();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Facet> bindFacetValues() {
|
||||
public List<Facet> bindFacetValues() {
|
||||
return delegate.bindFacetValues();
|
||||
}
|
||||
|
||||
|
|
|
@ -15,8 +15,8 @@
|
|||
*/
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
|
@ -41,12 +41,7 @@ public final class SelectFirstTransformingOperation<R, E>
|
|||
}
|
||||
|
||||
@Override
|
||||
public String[] getQueryKeys() {
|
||||
return delegate.getQueryKeys();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Facet> bindFacetValues() {
|
||||
public List<Facet> bindFacetValues() {
|
||||
return delegate.bindFacetValues();
|
||||
}
|
||||
|
||||
|
|
|
@ -31,8 +31,8 @@ import com.datastax.driver.core.querybuilder.Select.Where;
|
|||
import com.google.common.collect.Iterables;
|
||||
|
||||
import net.helenus.core.*;
|
||||
import net.helenus.core.cache.EntityIdentifyingFacet;
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.core.cache.UnboundFacet;
|
||||
import net.helenus.core.reflect.HelenusPropertyNode;
|
||||
import net.helenus.mapping.HelenusEntity;
|
||||
import net.helenus.mapping.MappingUtil;
|
||||
|
@ -177,31 +177,37 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, EntityIdentifyingFacet> getIdentifyingFacets() {
|
||||
public List<Facet> getFacets() {
|
||||
HelenusEntity entity = props.get(0).getEntity();
|
||||
return entity.getIdentifyingFacets();
|
||||
return entity.getFacets();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Facet> bindFacetValues() {
|
||||
public List<Facet> bindFacetValues() {
|
||||
HelenusEntity entity = props.get(0).getEntity();
|
||||
Set<Facet> boundFacets = new HashSet<Facet>();
|
||||
// Check to see if this select statement has enough information to build one or
|
||||
// more identifying facets.
|
||||
entity.getIdentifyingFacets().forEach((facetName, facet) -> {
|
||||
EntityIdentifyingFacet.Binder binder = facet.binder();
|
||||
facet.getProperties().forEach(prop -> {
|
||||
Filter filter = filters.get(prop);
|
||||
if (filter != null) {
|
||||
binder.setValueForProperty(prop, filter.toString());
|
||||
} else if (facetName.equals("*")) {
|
||||
binder.setValueForProperty(prop, "");
|
||||
List<Facet> boundFacets = new ArrayList<>();
|
||||
|
||||
for (Facet facet : entity.getFacets()) {
|
||||
if (facet instanceof UnboundFacet) {
|
||||
UnboundFacet unboundFacet = (UnboundFacet) facet;
|
||||
UnboundFacet.Binder binder = unboundFacet.binder();
|
||||
unboundFacet.getProperties().forEach(prop -> {
|
||||
Filter filter = filters.get(prop);
|
||||
if (filter != null) {
|
||||
Object[] postulates = filter.postulateValues();
|
||||
for (Object p : postulates) {
|
||||
binder.setValueForProperty(prop, p.toString());
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
if (binder.isBound()) {
|
||||
boundFacets.add(binder.bind());
|
||||
}
|
||||
});
|
||||
if (binder.isFullyBound()) {
|
||||
boundFacets.add(binder.bind());
|
||||
} else {
|
||||
boundFacets.add(facet);
|
||||
}
|
||||
});
|
||||
}
|
||||
return boundFacets;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,15 +15,13 @@
|
|||
*/
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.querybuilder.BuiltStatement;
|
||||
|
||||
import net.helenus.core.cache.EntityIdentifyingFacet;
|
||||
import net.helenus.core.cache.Facet;
|
||||
|
||||
public final class SelectTransformingOperation<R, E>
|
||||
|
@ -43,18 +41,13 @@ public final class SelectTransformingOperation<R, E>
|
|||
}
|
||||
|
||||
@Override
|
||||
public String[] getQueryKeys() {
|
||||
return delegate.getQueryKeys();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Facet> bindFacetValues() {
|
||||
public List<Facet> bindFacetValues() {
|
||||
return delegate.bindFacetValues();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, EntityIdentifyingFacet> getIdentifyingFacets() {
|
||||
return delegate.getIdentifyingFacets();
|
||||
public List<Facet> getFacets() {
|
||||
return delegate.getFacets();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -578,7 +578,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
|
|||
// TODO(gburd): Only drafted entity objects are updated in the cache at this
|
||||
// time.
|
||||
if (draft != null) {
|
||||
updateCache(uow, result, getIdentifyingFacets(), getQueryKeys());
|
||||
updateCache(uow, result, getFacets());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -16,9 +16,9 @@
|
|||
package net.helenus.mapping;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.List;
|
||||
|
||||
import net.helenus.core.cache.EntityIdentifyingFacet;
|
||||
import net.helenus.core.cache.Facet;
|
||||
|
||||
public interface HelenusEntity {
|
||||
|
||||
|
@ -34,5 +34,5 @@ public interface HelenusEntity {
|
|||
|
||||
HelenusProperty getProperty(String name);
|
||||
|
||||
Map<String, EntityIdentifyingFacet> getIdentifyingFacets();
|
||||
List<Facet> getFacets();
|
||||
}
|
||||
|
|
|
@ -28,7 +28,8 @@ import com.google.common.collect.ImmutableMap;
|
|||
import net.helenus.config.HelenusSettings;
|
||||
import net.helenus.core.Helenus;
|
||||
import net.helenus.core.annotation.Cacheable;
|
||||
import net.helenus.core.cache.EntityIdentifyingFacet;
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.core.cache.UnboundFacet;
|
||||
import net.helenus.mapping.annotation.*;
|
||||
import net.helenus.support.HelenusMappingException;
|
||||
|
||||
|
@ -41,9 +42,7 @@ public final class HelenusMappingEntity implements HelenusEntity {
|
|||
private final ImmutableMap<String, Method> methods;
|
||||
private final ImmutableMap<String, HelenusProperty> props;
|
||||
private final ImmutableList<HelenusProperty> orderedProps;
|
||||
private final EntityIdentifyingFacet primaryIdentityFacet;
|
||||
private final ImmutableMap<String, EntityIdentifyingFacet> allIdentityFacets;
|
||||
private final ImmutableMap<String, EntityIdentifyingFacet> ancillaryIdentityFacets;
|
||||
private final List<Facet> facets;
|
||||
|
||||
public HelenusMappingEntity(Class<?> iface, Metadata metadata) {
|
||||
this(iface, autoDetectType(iface), metadata);
|
||||
|
@ -112,33 +111,31 @@ public final class HelenusMappingEntity implements HelenusEntity {
|
|||
// Caching
|
||||
cacheable = (null != iface.getDeclaredAnnotation(Cacheable.class));
|
||||
|
||||
ImmutableMap.Builder<String, EntityIdentifyingFacet> allFacetsBuilder = ImmutableMap.builder();
|
||||
ImmutableMap.Builder<String, EntityIdentifyingFacet> ancillaryFacetsBuilder = ImmutableMap.builder();
|
||||
EntityIdentifyingFacet primaryFacet = null;
|
||||
List<HelenusProperty> primaryProperties = new ArrayList<HelenusProperty>(4);
|
||||
for (HelenusProperty prop : propsLocal) {
|
||||
List<HelenusProperty> primaryKeyProperties = new ArrayList<>();
|
||||
ImmutableList.Builder<Facet> facetsBuilder = ImmutableList.builder();
|
||||
facetsBuilder.add(new Facet("table", name.toCql()));
|
||||
for (HelenusProperty prop : orderedProps) {
|
||||
switch (prop.getColumnType()) {
|
||||
case PARTITION_KEY :
|
||||
case CLUSTERING_COLUMN :
|
||||
primaryProperties.add(prop);
|
||||
primaryKeyProperties.add(prop);
|
||||
break;
|
||||
default :
|
||||
if (primaryProperties != null) {
|
||||
primaryFacet = new EntityIdentifyingFacet(new HashSet<HelenusProperty>(primaryProperties));
|
||||
allFacetsBuilder.put("*", primaryFacet);
|
||||
primaryProperties = null;
|
||||
if (primaryKeyProperties != null && primaryKeyProperties.size() > 0) {
|
||||
facetsBuilder.add(new UnboundFacet(primaryKeyProperties));
|
||||
primaryKeyProperties = null;
|
||||
}
|
||||
Optional<IdentityName> optionalIndexName = prop.getIndexName();
|
||||
if (optionalIndexName.isPresent()) {
|
||||
EntityIdentifyingFacet facet = new EntityIdentifyingFacet(prop);
|
||||
ancillaryFacetsBuilder.put(prop.getPropertyName(), facet);
|
||||
allFacetsBuilder.put(prop.getPropertyName(), facet);
|
||||
UnboundFacet facet = new UnboundFacet(prop);
|
||||
facetsBuilder.add(facet);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.primaryIdentityFacet = primaryFacet;
|
||||
this.ancillaryIdentityFacets = ancillaryFacetsBuilder.build();
|
||||
this.allIdentityFacets = allFacetsBuilder.build();
|
||||
if (primaryKeyProperties != null && primaryKeyProperties.size() > 0) {
|
||||
facetsBuilder.add(new UnboundFacet(primaryKeyProperties));
|
||||
}
|
||||
this.facets = facetsBuilder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -172,8 +169,8 @@ public final class HelenusMappingEntity implements HelenusEntity {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, EntityIdentifyingFacet> getIdentifyingFacets() {
|
||||
return allIdentityFacets;
|
||||
public List<Facet> getFacets() {
|
||||
return facets;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in a new issue