From efa87b2d4f4fc50b2fe4a399d0b144aa9c351c23 Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Tue, 26 Sep 2017 10:37:08 -0400 Subject: [PATCH] WIP: working toward a faceted cache. --- NOTES | 5 ++ .../java/net/helenus/core/HelenusSession.java | 6 +- ...UnitOfWorkAspect.java => RetryAspect.java} | 4 +- .../core/cache/EntityIdentifyingFacet.java | 14 +++ .../java/net/helenus/core/cache/Facet.java | 21 +++++ .../operation/AbstractOptionalOperation.java | 85 ++++++++++++++++--- .../net/helenus/core/operation/Operation.java | 5 +- .../core/operation/SelectOperation.java | 42 +++++---- .../net/helenus/mapping/HelenusEntity.java | 6 ++ .../helenus/mapping/HelenusMappingEntity.java | 33 +++++++ 10 files changed, 179 insertions(+), 42 deletions(-) rename src/main/java/net/helenus/core/aspect/{RetryConcurrentUnitOfWorkAspect.java => RetryAspect.java} (97%) create mode 100644 src/main/java/net/helenus/core/cache/EntityIdentifyingFacet.java create mode 100644 src/main/java/net/helenus/core/cache/Facet.java diff --git a/NOTES b/NOTES index 04dd989..038b1e9 100644 --- a/NOTES +++ b/NOTES @@ -1,3 +1,8 @@ + + + + + --- Cache // `E` is the type of the Entity class or one of: // - ResultSet diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index db08f8e..785fba6 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -181,11 +181,11 @@ public final class HelenusSession extends AbstractSessionOperations implements C public Metadata getMetadata() { return metadata; } - public synchronized UnitOfWork begin() { + public synchronized T begin() { return begin(null); } - public synchronized UnitOfWork begin(UnitOfWork parent) { + public synchronized T begin(T parent) { try { Class clazz = unitOfWorkClass; Constructor ctor = clazz.getConstructor(HelenusSession.class, UnitOfWork.class); @@ -193,7 +193,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C if (parent != null) { parent.addNestedUnitOfWork(uow); } - return uow.begin(); + return (T) uow.begin(); } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { throw new HelenusException(String.format("Unable to instantiate {} as a UnitOfWork.", unitOfWorkClass.getSimpleName()), e); diff --git a/src/main/java/net/helenus/core/aspect/RetryConcurrentUnitOfWorkAspect.java b/src/main/java/net/helenus/core/aspect/RetryAspect.java similarity index 97% rename from src/main/java/net/helenus/core/aspect/RetryConcurrentUnitOfWorkAspect.java rename to src/main/java/net/helenus/core/aspect/RetryAspect.java index 7d52f16..e828f8b 100644 --- a/src/main/java/net/helenus/core/aspect/RetryConcurrentUnitOfWorkAspect.java +++ b/src/main/java/net/helenus/core/aspect/RetryAspect.java @@ -14,9 +14,9 @@ import org.springframework.core.annotation.AnnotationUtils; import org.springframework.util.Assert; @Aspect -public class RetryConcurrentUnitOfWorkAspect { +public class RetryAspect { - private static final Logger log = LoggerFactory.getLogger(RetryConcurrentUnitOfWorkAspect.class); + private static final Logger log = LoggerFactory.getLogger(RetryAspect.class); @Around("@annotation(net.helenus.core.annotations.Retry)") public Object retry(ProceedingJoinPoint pjp) throws Throwable { diff --git a/src/main/java/net/helenus/core/cache/EntityIdentifyingFacet.java b/src/main/java/net/helenus/core/cache/EntityIdentifyingFacet.java new file mode 100644 index 0000000..8bf3b8b --- /dev/null +++ b/src/main/java/net/helenus/core/cache/EntityIdentifyingFacet.java @@ -0,0 +1,14 @@ +package net.helenus.core.cache; + +import net.helenus.mapping.HelenusProperty; + +public class EntityIdentifyingFacet extends Facet { + + public EntityIdentifyingFacet(HelenusProperty prop) { + + } + + public EntityIdentifyingFacet(HelenusProperty[]... props) { + + } +} diff --git a/src/main/java/net/helenus/core/cache/Facet.java b/src/main/java/net/helenus/core/cache/Facet.java new file mode 100644 index 0000000..eb521b1 --- /dev/null +++ b/src/main/java/net/helenus/core/cache/Facet.java @@ -0,0 +1,21 @@ +package net.helenus.core.cache; + +public class Facet { +} +/* + +An Entity is identifiable via one or more Facets + +A Facet is is a set of Properties and bound Facets + +An Entity will have it's Keyspace, Table and Schema Version Facets bound. + +A property may also have a TTL or write time bound. + +The cache contains key->value mappings of merkel-hash -> Entity or Set +The only way a Set is put into the cache is with a key = hash([Entity's bound Facets, hash(filter clause from SELECT)]) + +REMEMBER to update the cache on build() for all impacted facets, delete existing keys and add new keys + + + */ diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index 93ec79d..221fcf7 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -23,12 +23,22 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.util.HashSet; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import net.helenus.core.AbstractSessionOperations; +import net.helenus.core.Filter; +import net.helenus.core.Helenus; import net.helenus.core.UnitOfWork; +import net.helenus.core.cache.EntityIdentifyingFacet; +import net.helenus.mapping.HelenusEntity; +import net.helenus.mapping.HelenusProperty; +import net.helenus.mapping.value.BeanColumnValueProvider; +import net.helenus.support.Either; + +import javax.swing.text.html.parser.Entity; public abstract class AbstractOptionalOperation> extends AbstractStatementOperation { @@ -72,25 +82,74 @@ public abstract class AbstractOptionalOperation result = null; - String key = getStatementCacheKey(); - if (enableCache && key != null) { - Set cachedResult = (Set) uow.cacheLookup(key); - if (cachedResult != null) { - //TODO(gburd): what about select ResultSet, Tuple... etc.? - uowCacheHits.mark(); - logger.info("UOW({}) cache hit, {}", uow.hashCode(), key); - result = cachedResult.stream().findFirst(); - } else { - uowCacheMiss.mark(); - } + String stmtKey = null; + if (enableCache) { + Set facets = getIdentifyingFacets(); + if (!facets.isEmpty()) { + for (EntityIdentifyingFacet facet : facets) { + //TODO(gburd): what about select ResultSet, Tuple... etc.? + Optional, E>> optionalCachedResult = uow.cacheLookup(facet.hashCode()); + if (optionalCachedResult.isPresent()) { + uowCacheHits.mark(); + logger.info("UnitOfWork({}) cache hit for facet: {} with key: {}", uow.hashCode(), facet.toString(), facet.hashCode()); + Either, E> eitherCachedResult = optionalCachedResult.get(); + if (eitherCachedResult.isRight()) { + E cachedResult = eitherCachedResult.getRight(); + result = Optional.of(cachedResult); + } + break; + } + } + } else { + // The operation didn't provide enough information to uniquely identify the entity object + // using one of the facets, but that doesn't mean a filtering query won't return a proper + // result. Look in the cache to see if this statement has been executed before. + stmtKey = getStatementCacheKey(); + Optional, E>> optionalCachedResult = uow.cacheLookup(stmtKey.hashCode()); + if (optionalCachedResult.isPresent()) { + Either, E> eitherCachedResult = optionalCachedResult.get(); + if (eitherCachedResult.isLeft()) { + Set cachedResult = eitherCachedResult.getLeft(); + // Ensure that this non-indexed selection uniquely identified an Entity. + if (!(cachedResult.isEmpty() || cachedResult.size() > 1)) { + uowCacheHits.mark(); + logger.info("UnitOfWork({}) cache hit for stmt {} {}", uow.hashCode(), stmtKey, + stmtKey.hashCode()); + result = cachedResult.stream().findFirst(); + } + } + } } if (result == null) { + uowCacheMiss.mark(); ResultSet resultSet = execute(sessionOps, uow, traceContext, showValues, true); result = transform(resultSet); - if (key != null) { - if (result.isPresent()) { + if (enableCache && result.isPresent()) { + // If we executed a query that didn't depend on an we have a stmtKey for the filters, add that to the cache. + if (stmtKey != null) { + Set set = new HashSet(1); + set.add(result.get()); + uow.getCache().put(stmtKey.hashCode(), set); + } + // Now insert this entity into the cache for each facet for this entity that we can fully bind. + E entity = result.get(); + Map facetMap = Helenus.entity(result.get().getClass()).getIdentityFacets(); + facetMap.forEach((facetName, facet) -> { + EntityIdentifyingFacet boundFacet = null; + if (!facet.isFullyBound()) { + boundFacet = new EntityIdentifyingFacet(facet); + for (HelenusProperty prop : facet.getUnboundEntityProperties()) { + Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(entity, -1, prop); + if (value == null) { break; } + boundFacet.setValueForProperty(prop, value); + } + } + if (boundFacet != null && boundFacet.isFullyBound()) { + uow.getCache().put(boundFacet.hashCode(), Either) + } + }); Set set = new HashSet(1); set.add(result.get()); uow.getCache().put(key, set); diff --git a/src/main/java/net/helenus/core/operation/Operation.java b/src/main/java/net/helenus/core/operation/Operation.java index d349ada..d6e1b48 100644 --- a/src/main/java/net/helenus/core/operation/Operation.java +++ b/src/main/java/net/helenus/core/operation/Operation.java @@ -1,5 +1,6 @@ package net.helenus.core.operation; +import java.util.Set; import java.util.concurrent.ExecutionException; import com.codahale.metrics.Meter; @@ -14,7 +15,7 @@ import brave.Tracer; import brave.propagation.TraceContext; import net.helenus.core.AbstractSessionOperations; import net.helenus.core.UnitOfWork; - +import net.helenus.core.cache.EntityIdentifyingFacet; public abstract class Operation { @@ -68,6 +69,6 @@ public abstract class Operation { public Statement buildStatement(boolean cached) { return null; } - public String getStatementCacheKey() { return null; } + public Set getIdentifyingFacets() { return null; } } diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index 05403d1..63619b6 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -15,7 +15,6 @@ */ package net.helenus.core.operation; -import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.querybuilder.BuiltStatement; import com.datastax.driver.core.querybuilder.Ordering; @@ -31,8 +30,11 @@ import java.util.stream.StreamSupport; import com.google.common.base.Joiner; import com.google.common.collect.Iterables; import net.helenus.core.*; +import net.helenus.core.cache.Facet; +import net.helenus.core.cache.EntityIdentifyingFacet; import net.helenus.core.reflect.HelenusPropertyNode; import net.helenus.mapping.HelenusEntity; +import net.helenus.mapping.HelenusProperty; import net.helenus.mapping.MappingUtil; import net.helenus.mapping.OrderingDirection; import net.helenus.mapping.value.ColumnValueProvider; @@ -181,30 +183,26 @@ public final class SelectOperation extends AbstractFilterStreamOperation keys = new ArrayList<>(filters.size()); + public Set getIdentityFacets() { HelenusEntity entity = props.get(0).getEntity(); - - for (HelenusPropertyNode prop : props) { - switch (prop.getProperty().getColumnType()) { - case PARTITION_KEY: - case CLUSTERING_COLUMN: - - Filter filter = filters.get(prop.getProperty()); - if (filter != null) { - keys.add(filter.toString()); - } else { - return null; + final Set facets = new HashSet<>(filters.size()); + // Check to see if this select statement has enough information to build one or + // more identifying facets. + entity.getIdentityFacets().forEach((facetName, facet) -> { + EntityIdentifyingFacet boundFacet = null; + if (!facet.isFullyBound()) { + boundFacet = new EntityIdentifyingFacet(facet); + for (HelenusProperty prop : facet.getUnboundEntityProperties()) { + Filter filter = filters.get(facet.getProperty()); + if (filter == null) { break; } + boundFacet.setValueForProperty(prop, filter.toString()); } - break; - default: - if (keys.size() > 0) { - return entity.getName() + ": " + Joiner.on(",").join(keys); - } - return null; } - } - return null; + if (boundFacet != null && boundFacet.isFullyBound()) { + facets.add(boundFacet); + } + }); + return facets; } @Override diff --git a/src/main/java/net/helenus/mapping/HelenusEntity.java b/src/main/java/net/helenus/mapping/HelenusEntity.java index 6953d2e..ef63078 100644 --- a/src/main/java/net/helenus/mapping/HelenusEntity.java +++ b/src/main/java/net/helenus/mapping/HelenusEntity.java @@ -15,7 +15,10 @@ */ package net.helenus.mapping; +import net.helenus.core.cache.EntityIdentifyingFacet; + import java.util.Collection; +import java.util.Map; public interface HelenusEntity { @@ -30,4 +33,7 @@ public interface HelenusEntity { Collection getOrderedProperties(); HelenusProperty getProperty(String name); + + Map getIdentityFacets(); + } diff --git a/src/main/java/net/helenus/mapping/HelenusMappingEntity.java b/src/main/java/net/helenus/mapping/HelenusMappingEntity.java index f1d57c3..eaee3fe 100644 --- a/src/main/java/net/helenus/mapping/HelenusMappingEntity.java +++ b/src/main/java/net/helenus/mapping/HelenusMappingEntity.java @@ -20,9 +20,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.lang.reflect.Method; import java.util.*; + import net.helenus.config.HelenusSettings; import net.helenus.core.Helenus; import net.helenus.core.annotation.Cacheable; +import net.helenus.core.cache.EntityIdentifyingFacet; import net.helenus.mapping.annotation.*; import net.helenus.support.HelenusMappingException; import org.apache.commons.lang3.ClassUtils; @@ -36,6 +38,9 @@ public final class HelenusMappingEntity implements HelenusEntity { private final ImmutableMap methods; private final ImmutableMap props; private final ImmutableList orderedProps; + private final EntityIdentifyingFacet primaryIdentityFacet; + private final ImmutableMap allIdentityFacets; + private final ImmutableMap ancillaryIdentityFacets; public HelenusMappingEntity(Class iface, Metadata metadata) { this(iface, autoDetectType(iface), metadata); @@ -101,7 +106,35 @@ public final class HelenusMappingEntity implements HelenusEntity { validateOrdinals(); + // Caching cacheable = (null != iface.getDeclaredAnnotation(Cacheable.class)); + + ImmutableMap.Builder allFacetsBuilder = ImmutableMap.builder(); + ImmutableMap.Builder ancillaryFacetsBuilder = ImmutableMap.builder(); + EntityIdentifyingFacet primaryFacet = null; + List primaryProperties = new ArrayList(4); + for (HelenusProperty prop : propsLocal) { + switch(prop.getColumnType()) { + case PARTITION_KEY: + case CLUSTERING_COLUMN: + primaryProperties.add(prop); + break; + default: + if (primaryProperties != null) { + primaryFacet = new EntityIdentifyingFacet(keyspace, table, schemaVersion, primaryProperties.toArray(new HelenusProperty[props.size()])); + allFacetsBuilder.put("*", primaryFacet); + primaryProperties = null; + } + Optional optionalIndexName = prop.getIndexName(); + if (optionalIndexName.isPresent()) { + EntityIdentifyingFacet facet = new EntityIdentifyingFacet(keyspace, table, schemaVersion, prop); + ancillaryFacetsBuilder.put(prop.getPropertyName(), facet); + } + } + } + this.primaryIdentityFacet = primaryFacet; + this.ancillaryIdentityFacets = ancillaryFacetsBuilder.build(); + this.allIdentityFacets = allFacetsBuilder.build(); } @Override