WIP: working toward a faceted cache.
This commit is contained in:
parent
16af6ea175
commit
efa87b2d4f
10 changed files with 179 additions and 42 deletions
5
NOTES
5
NOTES
|
@ -1,3 +1,8 @@
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
--- Cache
|
||||
// `E` is the type of the Entity class or one of:
|
||||
// - ResultSet
|
||||
|
|
|
@ -181,11 +181,11 @@ public final class HelenusSession extends AbstractSessionOperations implements C
|
|||
|
||||
public Metadata getMetadata() { return metadata; }
|
||||
|
||||
public synchronized UnitOfWork begin() {
|
||||
public synchronized <T extends UnitOfWork> T begin() {
|
||||
return begin(null);
|
||||
}
|
||||
|
||||
public synchronized UnitOfWork begin(UnitOfWork parent) {
|
||||
public synchronized <T extends UnitOfWork> T begin(T parent) {
|
||||
try {
|
||||
Class<? extends UnitOfWork> clazz = unitOfWorkClass;
|
||||
Constructor<? extends UnitOfWork> ctor = clazz.getConstructor(HelenusSession.class, UnitOfWork.class);
|
||||
|
@ -193,7 +193,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
|
|||
if (parent != null) {
|
||||
parent.addNestedUnitOfWork(uow);
|
||||
}
|
||||
return uow.begin();
|
||||
return (T) uow.begin();
|
||||
}
|
||||
catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
|
||||
throw new HelenusException(String.format("Unable to instantiate {} as a UnitOfWork.", unitOfWorkClass.getSimpleName()), e);
|
||||
|
|
|
@ -14,9 +14,9 @@ import org.springframework.core.annotation.AnnotationUtils;
|
|||
import org.springframework.util.Assert;
|
||||
|
||||
@Aspect
|
||||
public class RetryConcurrentUnitOfWorkAspect {
|
||||
public class RetryAspect {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(RetryConcurrentUnitOfWorkAspect.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(RetryAspect.class);
|
||||
|
||||
@Around("@annotation(net.helenus.core.annotations.Retry)")
|
||||
public Object retry(ProceedingJoinPoint pjp) throws Throwable {
|
14
src/main/java/net/helenus/core/cache/EntityIdentifyingFacet.java
vendored
Normal file
14
src/main/java/net/helenus/core/cache/EntityIdentifyingFacet.java
vendored
Normal file
|
@ -0,0 +1,14 @@
|
|||
package net.helenus.core.cache;
|
||||
|
||||
import net.helenus.mapping.HelenusProperty;
|
||||
|
||||
public class EntityIdentifyingFacet extends Facet {
|
||||
|
||||
public EntityIdentifyingFacet(HelenusProperty prop) {
|
||||
|
||||
}
|
||||
|
||||
public EntityIdentifyingFacet(HelenusProperty[]... props) {
|
||||
|
||||
}
|
||||
}
|
21
src/main/java/net/helenus/core/cache/Facet.java
vendored
Normal file
21
src/main/java/net/helenus/core/cache/Facet.java
vendored
Normal file
|
@ -0,0 +1,21 @@
|
|||
package net.helenus.core.cache;
|
||||
|
||||
public class Facet {
|
||||
}
|
||||
/*
|
||||
|
||||
An Entity is identifiable via one or more Facets
|
||||
|
||||
A Facet is is a set of Properties and bound Facets
|
||||
|
||||
An Entity will have it's Keyspace, Table and Schema Version Facets bound.
|
||||
|
||||
A property may also have a TTL or write time bound.
|
||||
|
||||
The cache contains key->value mappings of merkel-hash -> Entity or Set<Entity>
|
||||
The only way a Set<Entity> is put into the cache is with a key = hash([Entity's bound Facets, hash(filter clause from SELECT)])
|
||||
|
||||
REMEMBER to update the cache on build() for all impacted facets, delete existing keys and add new keys
|
||||
|
||||
|
||||
*/
|
|
@ -23,12 +23,22 @@ import com.google.common.util.concurrent.Futures;
|
|||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import net.helenus.core.AbstractSessionOperations;
|
||||
import net.helenus.core.Filter;
|
||||
import net.helenus.core.Helenus;
|
||||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.core.cache.EntityIdentifyingFacet;
|
||||
import net.helenus.mapping.HelenusEntity;
|
||||
import net.helenus.mapping.HelenusProperty;
|
||||
import net.helenus.mapping.value.BeanColumnValueProvider;
|
||||
import net.helenus.support.Either;
|
||||
|
||||
import javax.swing.text.html.parser.Entity;
|
||||
|
||||
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
|
||||
extends AbstractStatementOperation<E, O> {
|
||||
|
@ -72,25 +82,74 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
try {
|
||||
|
||||
Optional<E> result = null;
|
||||
String key = getStatementCacheKey();
|
||||
if (enableCache && key != null) {
|
||||
Set<E> cachedResult = (Set<E>) uow.cacheLookup(key);
|
||||
if (cachedResult != null) {
|
||||
//TODO(gburd): what about select ResultSet, Tuple... etc.?
|
||||
uowCacheHits.mark();
|
||||
logger.info("UOW({}) cache hit, {}", uow.hashCode(), key);
|
||||
result = cachedResult.stream().findFirst();
|
||||
} else {
|
||||
uowCacheMiss.mark();
|
||||
}
|
||||
String stmtKey = null;
|
||||
if (enableCache) {
|
||||
Set<EntityIdentifyingFacet> facets = getIdentifyingFacets();
|
||||
if (!facets.isEmpty()) {
|
||||
for (EntityIdentifyingFacet facet : facets) {
|
||||
//TODO(gburd): what about select ResultSet, Tuple... etc.?
|
||||
Optional<Either<Set<E>, E>> optionalCachedResult = uow.cacheLookup(facet.hashCode());
|
||||
if (optionalCachedResult.isPresent()) {
|
||||
uowCacheHits.mark();
|
||||
logger.info("UnitOfWork({}) cache hit for facet: {} with key: {}", uow.hashCode(), facet.toString(), facet.hashCode());
|
||||
Either<Set<E>, E> eitherCachedResult = optionalCachedResult.get();
|
||||
if (eitherCachedResult.isRight()) {
|
||||
E cachedResult = eitherCachedResult.getRight();
|
||||
result = Optional.of(cachedResult);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// The operation didn't provide enough information to uniquely identify the entity object
|
||||
// using one of the facets, but that doesn't mean a filtering query won't return a proper
|
||||
// result. Look in the cache to see if this statement has been executed before.
|
||||
stmtKey = getStatementCacheKey();
|
||||
Optional<Either<Set<E>, E>> optionalCachedResult = uow.cacheLookup(stmtKey.hashCode());
|
||||
if (optionalCachedResult.isPresent()) {
|
||||
Either<Set<E>, E> eitherCachedResult = optionalCachedResult.get();
|
||||
if (eitherCachedResult.isLeft()) {
|
||||
Set<E> cachedResult = eitherCachedResult.getLeft();
|
||||
// Ensure that this non-indexed selection uniquely identified an Entity.
|
||||
if (!(cachedResult.isEmpty() || cachedResult.size() > 1)) {
|
||||
uowCacheHits.mark();
|
||||
logger.info("UnitOfWork({}) cache hit for stmt {} {}", uow.hashCode(), stmtKey,
|
||||
stmtKey.hashCode());
|
||||
result = cachedResult.stream().findFirst();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (result == null) {
|
||||
uowCacheMiss.mark();
|
||||
ResultSet resultSet = execute(sessionOps, uow, traceContext, showValues, true);
|
||||
result = transform(resultSet);
|
||||
|
||||
if (key != null) {
|
||||
if (result.isPresent()) {
|
||||
if (enableCache && result.isPresent()) {
|
||||
// If we executed a query that didn't depend on an we have a stmtKey for the filters, add that to the cache.
|
||||
if (stmtKey != null) {
|
||||
Set<Object> set = new HashSet<Object>(1);
|
||||
set.add(result.get());
|
||||
uow.getCache().put(stmtKey.hashCode(), set);
|
||||
}
|
||||
// Now insert this entity into the cache for each facet for this entity that we can fully bind.
|
||||
E entity = result.get();
|
||||
Map<String, EntityIdentifyingFacet> facetMap = Helenus.entity(result.get().getClass()).getIdentityFacets();
|
||||
facetMap.forEach((facetName, facet) -> {
|
||||
EntityIdentifyingFacet boundFacet = null;
|
||||
if (!facet.isFullyBound()) {
|
||||
boundFacet = new EntityIdentifyingFacet(facet);
|
||||
for (HelenusProperty prop : facet.getUnboundEntityProperties()) {
|
||||
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(entity, -1, prop);
|
||||
if (value == null) { break; }
|
||||
boundFacet.setValueForProperty(prop, value);
|
||||
}
|
||||
}
|
||||
if (boundFacet != null && boundFacet.isFullyBound()) {
|
||||
uow.getCache().put(boundFacet.hashCode(), Either)
|
||||
}
|
||||
});
|
||||
Set<Object> set = new HashSet<Object>(1);
|
||||
set.add(result.get());
|
||||
uow.getCache().put(key, set);
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package net.helenus.core.operation;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
|
@ -14,7 +15,7 @@ import brave.Tracer;
|
|||
import brave.propagation.TraceContext;
|
||||
import net.helenus.core.AbstractSessionOperations;
|
||||
import net.helenus.core.UnitOfWork;
|
||||
|
||||
import net.helenus.core.cache.EntityIdentifyingFacet;
|
||||
|
||||
public abstract class Operation<E> {
|
||||
|
||||
|
@ -68,6 +69,6 @@ public abstract class Operation<E> {
|
|||
|
||||
public Statement buildStatement(boolean cached) { return null; }
|
||||
|
||||
public String getStatementCacheKey() { return null; }
|
||||
public Set<EntityIdentifyingFacet> getIdentifyingFacets() { return null; }
|
||||
|
||||
}
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
*/
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.Row;
|
||||
import com.datastax.driver.core.querybuilder.BuiltStatement;
|
||||
import com.datastax.driver.core.querybuilder.Ordering;
|
||||
|
@ -31,8 +30,11 @@ import java.util.stream.StreamSupport;
|
|||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Iterables;
|
||||
import net.helenus.core.*;
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.core.cache.EntityIdentifyingFacet;
|
||||
import net.helenus.core.reflect.HelenusPropertyNode;
|
||||
import net.helenus.mapping.HelenusEntity;
|
||||
import net.helenus.mapping.HelenusProperty;
|
||||
import net.helenus.mapping.MappingUtil;
|
||||
import net.helenus.mapping.OrderingDirection;
|
||||
import net.helenus.mapping.value.ColumnValueProvider;
|
||||
|
@ -181,30 +183,26 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
|
|||
}
|
||||
|
||||
@Override
|
||||
public String getStatementCacheKey() {
|
||||
List<String> keys = new ArrayList<>(filters.size());
|
||||
public Set<EntityIdentifyingFacet> getIdentityFacets() {
|
||||
HelenusEntity entity = props.get(0).getEntity();
|
||||
|
||||
for (HelenusPropertyNode prop : props) {
|
||||
switch (prop.getProperty().getColumnType()) {
|
||||
case PARTITION_KEY:
|
||||
case CLUSTERING_COLUMN:
|
||||
|
||||
Filter filter = filters.get(prop.getProperty());
|
||||
if (filter != null) {
|
||||
keys.add(filter.toString());
|
||||
} else {
|
||||
return null;
|
||||
final Set<EntityIdentifyingFacet> facets = new HashSet<>(filters.size());
|
||||
// Check to see if this select statement has enough information to build one or
|
||||
// more identifying facets.
|
||||
entity.getIdentityFacets().forEach((facetName, facet) -> {
|
||||
EntityIdentifyingFacet boundFacet = null;
|
||||
if (!facet.isFullyBound()) {
|
||||
boundFacet = new EntityIdentifyingFacet(facet);
|
||||
for (HelenusProperty prop : facet.getUnboundEntityProperties()) {
|
||||
Filter filter = filters.get(facet.getProperty());
|
||||
if (filter == null) { break; }
|
||||
boundFacet.setValueForProperty(prop, filter.toString());
|
||||
}
|
||||
break;
|
||||
default:
|
||||
if (keys.size() > 0) {
|
||||
return entity.getName() + ": " + Joiner.on(",").join(keys);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
if (boundFacet != null && boundFacet.isFullyBound()) {
|
||||
facets.add(boundFacet);
|
||||
}
|
||||
});
|
||||
return facets;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -15,7 +15,10 @@
|
|||
*/
|
||||
package net.helenus.mapping;
|
||||
|
||||
import net.helenus.core.cache.EntityIdentifyingFacet;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
public interface HelenusEntity {
|
||||
|
||||
|
@ -30,4 +33,7 @@ public interface HelenusEntity {
|
|||
Collection<HelenusProperty> getOrderedProperties();
|
||||
|
||||
HelenusProperty getProperty(String name);
|
||||
|
||||
Map<String, EntityIdentifyingFacet> getIdentityFacets();
|
||||
|
||||
}
|
||||
|
|
|
@ -20,9 +20,11 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.*;
|
||||
|
||||
import net.helenus.config.HelenusSettings;
|
||||
import net.helenus.core.Helenus;
|
||||
import net.helenus.core.annotation.Cacheable;
|
||||
import net.helenus.core.cache.EntityIdentifyingFacet;
|
||||
import net.helenus.mapping.annotation.*;
|
||||
import net.helenus.support.HelenusMappingException;
|
||||
import org.apache.commons.lang3.ClassUtils;
|
||||
|
@ -36,6 +38,9 @@ public final class HelenusMappingEntity implements HelenusEntity {
|
|||
private final ImmutableMap<String, Method> methods;
|
||||
private final ImmutableMap<String, HelenusProperty> props;
|
||||
private final ImmutableList<HelenusProperty> orderedProps;
|
||||
private final EntityIdentifyingFacet primaryIdentityFacet;
|
||||
private final ImmutableMap<String, EntityIdentifyingFacet> allIdentityFacets;
|
||||
private final ImmutableMap<String, EntityIdentifyingFacet> ancillaryIdentityFacets;
|
||||
|
||||
public HelenusMappingEntity(Class<?> iface, Metadata metadata) {
|
||||
this(iface, autoDetectType(iface), metadata);
|
||||
|
@ -101,7 +106,35 @@ public final class HelenusMappingEntity implements HelenusEntity {
|
|||
|
||||
validateOrdinals();
|
||||
|
||||
// Caching
|
||||
cacheable = (null != iface.getDeclaredAnnotation(Cacheable.class));
|
||||
|
||||
ImmutableMap.Builder<String, EntityIdentifyingFacet> allFacetsBuilder = ImmutableMap.builder();
|
||||
ImmutableMap.Builder<String, EntityIdentifyingFacet> ancillaryFacetsBuilder = ImmutableMap.builder();
|
||||
EntityIdentifyingFacet primaryFacet = null;
|
||||
List<HelenusProperty> primaryProperties = new ArrayList<HelenusProperty>(4);
|
||||
for (HelenusProperty prop : propsLocal) {
|
||||
switch(prop.getColumnType()) {
|
||||
case PARTITION_KEY:
|
||||
case CLUSTERING_COLUMN:
|
||||
primaryProperties.add(prop);
|
||||
break;
|
||||
default:
|
||||
if (primaryProperties != null) {
|
||||
primaryFacet = new EntityIdentifyingFacet(keyspace, table, schemaVersion, primaryProperties.toArray(new HelenusProperty[props.size()]));
|
||||
allFacetsBuilder.put("*", primaryFacet);
|
||||
primaryProperties = null;
|
||||
}
|
||||
Optional<IdentityName> optionalIndexName = prop.getIndexName();
|
||||
if (optionalIndexName.isPresent()) {
|
||||
EntityIdentifyingFacet facet = new EntityIdentifyingFacet(keyspace, table, schemaVersion, prop);
|
||||
ancillaryFacetsBuilder.put(prop.getPropertyName(), facet);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.primaryIdentityFacet = primaryFacet;
|
||||
this.ancillaryIdentityFacets = ancillaryFacetsBuilder.build();
|
||||
this.allIdentityFacets = allFacetsBuilder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in a new issue