From d69d8a3b1e6d735de12e76016661ef76765c5671 Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Thu, 8 Feb 2018 10:09:23 -0500 Subject: [PATCH] Finish first steps of JCache integration, UnitOfWork statement cache now merges into available JCache at commit. --- NOTES | 8 +++ helenus-core.iml | 4 +- pom.xml | 10 ++++ .../net/helenus/core/AbstractUnitOfWork.java | 36 ++++++++++++-- .../java/net/helenus/core/HelenusSession.java | 49 ++++++++++--------- .../core/unitofwork/UnitOfWorkTest.java | 22 +++++++++ 6 files changed, 102 insertions(+), 27 deletions(-) diff --git a/NOTES b/NOTES index b2020d0..a2cb1f2 100644 --- a/NOTES +++ b/NOTES @@ -22,6 +22,14 @@ Operation/ `-- PreparedStreamOperation +---- +@CompoundIndex() +create a new col in the same table called __idx_a_b_c that the hash of the concatenated values in that order is stored, create a normal index for that (CREATE INDEX ...) +if a query matches that set of columns then use that indexed col to fetch the desired results from that table +could also work with .in() query if materialized view exists +---- + + // TODO(gburd): create a statement that matches one that wasn't prepared //String key = // "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString(); diff --git a/helenus-core.iml b/helenus-core.iml index 8e5de48..7b75a06 100644 --- a/helenus-core.iml +++ b/helenus-core.iml @@ -28,12 +28,15 @@ + + + @@ -117,7 +120,6 @@ - diff --git a/pom.xml b/pom.xml index c1a88e5..d6a9172 100644 --- a/pom.xml +++ b/pom.xml @@ -158,6 +158,16 @@ ca.exprofesso guava-jcache 1.0.4 + + + com.google.guava + guava + + + javax.cache + cache-api + + diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java index 4e10550..22491f7 100644 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -28,6 +28,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; + +import javax.cache.Cache; +import javax.cache.CacheManager; + import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; import net.helenus.core.operation.AbstractOperation; @@ -36,6 +40,7 @@ import net.helenus.mapping.MappingUtil; import net.helenus.support.Either; import net.helenus.support.HelenusException; import org.apache.commons.lang3.SerializationUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -262,7 +267,7 @@ public abstract class AbstractUnitOfWork @Override public void cacheDelete(String key) { - statementCache.replace(key, deleted); + statementCache.put(key, deleted); } @Override @@ -303,7 +308,7 @@ public abstract class AbstractUnitOfWork @Override public Object cacheUpdate(String key, Object value) { - return statementCache.replace(key, value); + return statementCache.put(key, value); } @Override @@ -315,7 +320,7 @@ public abstract class AbstractUnitOfWork if (facet.alone()) { String columnName = facet.name() + "==" + facet.value(); if (result == null) result = cache.get(tableName, columnName); - cache.put(tableName, columnName, Either.left(value)); + cache.put(tableName, columnName, Either.left(value)); } } } @@ -394,7 +399,30 @@ public abstract class AbstractUnitOfWork applyPostCommitFunctions("committed", uow.commitThunks); }); - // Merge our cache into the session cache. + // Merge our statement cache into the session cache if it exists. + CacheManager cacheManager = session.getCacheManager(); + if (cacheManager != null) { + for (Map.Entry entry : statementCache.entrySet()) { + String[] keyParts = entry.getKey().split("\\."); + if (keyParts.length == 2) { + String cacheName = keyParts[0]; + String key = keyParts[1]; + if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) { + Cache cache = cacheManager.getCache(cacheName); + if (cache != null) { + Object value = entry.getValue(); + if (value == deleted) { + cache.remove(key); + } else { + cache.put(key.toString(), value); + } + } + } + } + } + } + + // Merge our cache into the session cache. session.mergeCache(cache); // Spoil any lingering futures that may be out there. diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index 76f3509..48fd2d1 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -16,6 +16,7 @@ package net.helenus.core; import brave.Tracer; +import ca.exprofesso.guava.jcache.GuavaCachingProvider; import com.codahale.metrics.MetricRegistry; import com.datastax.driver.core.*; import com.google.common.collect.Table; @@ -40,7 +41,6 @@ import org.slf4j.LoggerFactory; import javax.cache.Cache; import javax.cache.CacheManager; import javax.cache.Caching; -import javax.cache.configuration.MutableConfiguration; import javax.cache.spi.CachingProvider; import java.io.Closeable; import java.io.PrintStream; @@ -57,8 +57,8 @@ import static net.helenus.core.Query.eq; public class HelenusSession extends AbstractSessionOperations implements Closeable { - public static final Object deleted = new Object(); private static final Logger LOG = LoggerFactory.getLogger(HelenusSession.class); + public static final Object deleted = new Object(); private static final Pattern classNameRegex = Pattern.compile("^(?:\\w+\\.)+(?:(\\w+)|(\\w+)\\$.*)$"); @@ -114,14 +114,12 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab this.unitOfWorkClass = unitOfWorkClass; this.metricRegistry = metricRegistry; this.zipkinTracer = tracer; - this.cacheManager = cacheManger; if (cacheManager == null) { - MutableConfiguration configuration = new MutableConfiguration<>(); - configuration.setStoreByValue(false); - configuration.setTypes(String.class, Object.class); - CachingProvider cachingProvider = Caching.getCachingProvider(GuavaCacheManager.class.getName()); - cacheManager = cachingProvider.getCacheManager(); + CachingProvider cachingProvider = Caching.getCachingProvider(GuavaCachingProvider.class.getName()); + this.cacheManager = cachingProvider.getCacheManager(); + } else { + this.cacheManager = cacheManager; } this.valueProvider = new RowColumnValueProvider(this.sessionRepository); @@ -278,6 +276,9 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab @Override public void mergeCache(Table>> uowCache) { + if (cacheManager == null) { + return; + } List items = uowCache .values() @@ -325,22 +326,22 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab } if (cacheManager != null) { - List> deletedFacetSets = - uowCache - .values() - .stream() - .filter(Either::isRight) - .map(Either::getRight) - .collect(Collectors.toList()); - for (List facets : deletedFacetSets) { - String tableName = CacheUtil.schemaName(facets); - Cache cache = cacheManager.getCache(tableName); - if (cache != null) { - List keys = CacheUtil.flatKeys(tableName, facets); - keys.forEach(key -> cache.remove(key)); - } + List> deletedFacetSets = + uowCache + .values() + .stream() + .filter(Either::isRight) + .map(Either::getRight) + .collect(Collectors.toList()); + for (List facets : deletedFacetSets) { + String tableName = CacheUtil.schemaName(facets); + Cache cache = cacheManager.getCache(tableName); + if (cache != null) { + List keys = CacheUtil.flatKeys(tableName, facets); + keys.forEach(key -> cache.remove(key)); } } + } } private void replaceCachedFacetValues( @@ -360,6 +361,10 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab } } + public CacheManager getCacheManager() { + return cacheManager; + } + public Metadata getMetadata() { return metadata; } diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java index 99d694d..b548d02 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java @@ -22,12 +22,17 @@ import com.datastax.driver.core.utils.UUIDs; import java.io.Serializable; import java.util.Date; import java.util.UUID; + +import javax.cache.CacheManager; +import javax.cache.configuration.MutableConfiguration; + import net.bytebuddy.utility.RandomString; import net.helenus.core.Helenus; import net.helenus.core.HelenusSession; import net.helenus.core.UnitOfWork; import net.helenus.core.annotation.Cacheable; import net.helenus.core.reflect.Entity; +import net.helenus.mapping.MappingUtil; import net.helenus.mapping.annotation.Constraints; import net.helenus.mapping.annotation.Index; import net.helenus.mapping.annotation.PartitionKey; @@ -75,6 +80,13 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .idempotentQueryExecution(true) .get(); widget = session.dsl(Widget.class); + + MutableConfiguration configuration = new MutableConfiguration<>(); + configuration + .setStoreByValue(false) + .setReadThrough(false); + CacheManager cacheManager = session.getCacheManager(); + cacheManager.createCache(MappingUtil.getTableName(Widget.class, true).toString(), configuration); } @Test @@ -332,8 +344,12 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { w2 = session.select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null); + String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key.toString(); + uow.cacheUpdate(cacheKey, w1); + // This should remove the object from the cache. session.delete(widget).where(widget::id, eq(key)).sync(uow); + uow.cacheDelete(cacheKey); // This should fail to read from the cache. w3 = @@ -452,6 +468,9 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .value(widget::id, key1) .value(widget::name, RandomString.make(20)) .sync(uow); + + String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key1.toString(); + uow.cacheUpdate(cacheKey, w1); /* w2 = session.upsert(w1) .value(widget::a, RandomString.make(10)) @@ -484,9 +503,12 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .value(widget::d, RandomString.make(10)) .sync(uow); + String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key.toString(); + uow.cacheUpdate(cacheKey, w1); // This should read from the cache and get the same instance of a Widget. w2 = session.select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null); + uow.cacheUpdate(cacheKey, w1); uow.commit() .andThen(