diff --git a/helenus-core.iml b/helenus-core.iml index 7b75a06..2578277 100644 --- a/helenus-core.iml +++ b/helenus-core.iml @@ -36,7 +36,6 @@ - @@ -118,6 +117,7 @@ + diff --git a/pom.xml b/pom.xml index d6a9172..0011e00 100644 --- a/pom.xml +++ b/pom.xml @@ -154,22 +154,6 @@ 20.0 - - ca.exprofesso - guava-jcache - 1.0.4 - - - com.google.guava - guava - - - javax.cache - cache-api - - - - io.zipkin.java @@ -239,6 +223,24 @@ test + + + ca.exprofesso + guava-jcache + 1.0.4 + + + com.google.guava + guava + + + javax.cache + cache-api + + + test + + commons-io commons-io diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java index 22491f7..46ad287 100644 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -28,10 +28,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; - import javax.cache.Cache; import javax.cache.CacheManager; - import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; import net.helenus.core.operation.AbstractOperation; @@ -320,7 +318,7 @@ public abstract class AbstractUnitOfWork if (facet.alone()) { String columnName = facet.name() + "==" + facet.value(); if (result == null) result = cache.get(tableName, columnName); - cache.put(tableName, columnName, Either.left(value)); + cache.put(tableName, columnName, Either.left(value)); } } } @@ -400,29 +398,29 @@ public abstract class AbstractUnitOfWork }); // Merge our statement cache into the session cache if it exists. - CacheManager cacheManager = session.getCacheManager(); - if (cacheManager != null) { - for (Map.Entry entry : statementCache.entrySet()) { - String[] keyParts = entry.getKey().split("\\."); - if (keyParts.length == 2) { - String cacheName = keyParts[0]; - String key = keyParts[1]; - if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) { - Cache cache = cacheManager.getCache(cacheName); - if (cache != null) { - Object value = entry.getValue(); - if (value == deleted) { - cache.remove(key); - } else { - cache.put(key.toString(), value); - } - } - } + CacheManager cacheManager = session.getCacheManager(); + if (cacheManager != null) { + for (Map.Entry entry : statementCache.entrySet()) { + String[] keyParts = entry.getKey().split("\\."); + if (keyParts.length == 2) { + String cacheName = keyParts[0]; + String key = keyParts[1]; + if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) { + Cache cache = cacheManager.getCache(cacheName); + if (cache != null) { + Object value = entry.getValue(); + if (value == deleted) { + cache.remove(key); + } else { + cache.put(key.toString(), value); } + } } + } } + } - // Merge our cache into the session cache. + // Merge our cache into the session cache. session.mergeCache(cache); // Spoil any lingering futures that may be out there. diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index 48fd2d1..02734b9 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -15,11 +15,24 @@ */ package net.helenus.core; +import static net.helenus.core.Query.eq; + import brave.Tracer; -import ca.exprofesso.guava.jcache.GuavaCachingProvider; import com.codahale.metrics.MetricRegistry; import com.datastax.driver.core.*; import com.google.common.collect.Table; +import java.io.Closeable; +import java.io.PrintStream; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.*; +import java.util.concurrent.Executor; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import javax.cache.Cache; +import javax.cache.CacheManager; import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; import net.helenus.core.cache.UnboundFacet; @@ -38,23 +51,6 @@ import net.helenus.support.Fun.Tuple6; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.cache.Cache; -import javax.cache.CacheManager; -import javax.cache.Caching; -import javax.cache.spi.CachingProvider; -import java.io.Closeable; -import java.io.PrintStream; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.*; -import java.util.concurrent.Executor; -import java.util.function.Function; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -import static net.helenus.core.Query.eq; - public class HelenusSession extends AbstractSessionOperations implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(HelenusSession.class); @@ -82,21 +78,21 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab private volatile boolean showValues; HelenusSession( - Session session, - String usingKeyspace, - CodecRegistry registry, - boolean showCql, - boolean showValues, - PrintStream printStream, - SessionRepositoryBuilder sessionRepositoryBuilder, - Executor executor, - boolean dropSchemaOnClose, - ConsistencyLevel consistencyLevel, - boolean defaultQueryIdempotency, - Class unitOfWorkClass, - CacheManager cacheManager, - MetricRegistry metricRegistry, - Tracer tracer) { + Session session, + String usingKeyspace, + CodecRegistry registry, + boolean showCql, + boolean showValues, + PrintStream printStream, + SessionRepositoryBuilder sessionRepositoryBuilder, + Executor executor, + boolean dropSchemaOnClose, + ConsistencyLevel consistencyLevel, + boolean defaultQueryIdempotency, + Class unitOfWorkClass, + CacheManager cacheManager, + MetricRegistry metricRegistry, + Tracer tracer) { this.session = session; this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry; this.usingKeyspace = @@ -114,13 +110,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab this.unitOfWorkClass = unitOfWorkClass; this.metricRegistry = metricRegistry; this.zipkinTracer = tracer; - - if (cacheManager == null) { - CachingProvider cachingProvider = Caching.getCachingProvider(GuavaCachingProvider.class.getName()); - this.cacheManager = cachingProvider.getCacheManager(); - } else { - this.cacheManager = cacheManager; - } + this.cacheManager = cacheManager; this.valueProvider = new RowColumnValueProvider(this.sessionRepository); this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository); @@ -220,12 +210,14 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab @Override public Object checkCache(String tableName, List facets) { Object result = null; - Cache cache = cacheManager.getCache(tableName); - if (cache != null) { - for (String key : CacheUtil.flatKeys(tableName, facets)) { - result = cache.get(key); - if (result != null) { - return result; + if (cacheManager != null) { + Cache cache = cacheManager.getCache(tableName); + if (cache != null) { + for (String key : CacheUtil.flatKeys(tableName, facets)) { + result = cache.get(key); + if (result != null) { + return result; + } } } } @@ -234,10 +226,12 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab @Override public void cacheEvict(List facets) { - String tableName = CacheUtil.schemaName(facets); - Cache cache = cacheManager.getCache(tableName); - if (cache != null) { - CacheUtil.flatKeys(tableName, facets).forEach(key -> cache.remove(key)); + if (cacheManager != null) { + String tableName = CacheUtil.schemaName(facets); + Cache cache = cacheManager.getCache(tableName); + if (cache != null) { + CacheUtil.flatKeys(tableName, facets).forEach(key -> cache.remove(key)); + } } } @@ -276,93 +270,90 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab @Override public void mergeCache(Table>> uowCache) { - if (cacheManager == null) { - return; - } - List items = - uowCache - .values() - .stream() - .filter(Either::isLeft) - .map(Either::getLeft) - .distinct() - .collect(Collectors.toList()); - for (Object pojo : items) { - HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo)); - Map valueMap = - pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; - if (entity.isCacheable()) { - List boundFacets = new ArrayList<>(); - String tableName = CacheUtil.schemaName(boundFacets); - for (Facet facet : entity.getFacets()) { - if (facet instanceof UnboundFacet) { - UnboundFacet unboundFacet = (UnboundFacet) facet; - UnboundFacet.Binder binder = unboundFacet.binder(); - unboundFacet - .getProperties() - .forEach( - prop -> { - if (valueMap == null) { - Object value = - BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop); - binder.setValueForProperty(prop, value.toString()); - } else { - Object v = valueMap.get(prop.getPropertyName()); - if (v != null) { - binder.setValueForProperty(prop, v.toString()); + if (cacheManager != null) { + List items = + uowCache + .values() + .stream() + .filter(Either::isLeft) + .map(Either::getLeft) + .distinct() + .collect(Collectors.toList()); + for (Object pojo : items) { + HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo)); + Map valueMap = + pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; + if (entity.isCacheable()) { + List boundFacets = new ArrayList<>(); + String tableName = CacheUtil.schemaName(boundFacets); + for (Facet facet : entity.getFacets()) { + if (facet instanceof UnboundFacet) { + UnboundFacet unboundFacet = (UnboundFacet) facet; + UnboundFacet.Binder binder = unboundFacet.binder(); + unboundFacet + .getProperties() + .forEach( + prop -> { + if (valueMap == null) { + Object value = + BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop); + binder.setValueForProperty(prop, value.toString()); + } else { + Object v = valueMap.get(prop.getPropertyName()); + if (v != null) { + binder.setValueForProperty(prop, v.toString()); + } } - } - }); - if (binder.isBound()) { - boundFacets.add(binder.bind()); + }); + if (binder.isBound()) { + boundFacets.add(binder.bind()); + } + } else { + boundFacets.add(facet); } - } else { - boundFacets.add(facet); } + List facetCombinations = CacheUtil.flattenFacets(boundFacets); + replaceCachedFacetValues(pojo, tableName, facetCombinations); + } + } + + List> deletedFacetSets = + uowCache + .values() + .stream() + .filter(Either::isRight) + .map(Either::getRight) + .collect(Collectors.toList()); + for (List facets : deletedFacetSets) { + String tableName = CacheUtil.schemaName(facets); + Cache cache = cacheManager.getCache(tableName); + if (cache != null) { + List keys = CacheUtil.flatKeys(tableName, facets); + keys.forEach(key -> cache.remove(key)); } - List facetCombinations = CacheUtil.flattenFacets(boundFacets); - replaceCachedFacetValues(pojo, tableName, facetCombinations); } } - - if (cacheManager != null) { - List> deletedFacetSets = - uowCache - .values() - .stream() - .filter(Either::isRight) - .map(Either::getRight) - .collect(Collectors.toList()); - for (List facets : deletedFacetSets) { - String tableName = CacheUtil.schemaName(facets); - Cache cache = cacheManager.getCache(tableName); - if (cache != null) { - List keys = CacheUtil.flatKeys(tableName, facets); - keys.forEach(key -> cache.remove(key)); - } - } - } } private void replaceCachedFacetValues( Object pojo, String tableName, List facetCombinations) { - for (String[] combination : facetCombinations) { + if (cacheManager != null) { + for (String[] combination : facetCombinations) { String cacheKey = tableName + "." + Arrays.toString(combination); - if (cacheManager != null) { - Cache cache = cacheManager.getCache(tableName); - if (cache != null) { - if (pojo == null || pojo == HelenusSession.deleted) { - cache.remove(cacheKey); - } else { - cache.put(cacheKey, pojo); - } - } + Cache cache = cacheManager.getCache(tableName); + if (cache != null) { + if (pojo == null || pojo == HelenusSession.deleted) { + cache.remove(cacheKey); + } else { + cache.put(cacheKey, pojo); + } } + } } } public CacheManager getCacheManager() { - return cacheManager; + return cacheManager; } public Metadata getMetadata() { diff --git a/src/main/java/net/helenus/core/SessionInitializer.java b/src/main/java/net/helenus/core/SessionInitializer.java index 26c921b..861cbf1 100644 --- a/src/main/java/net/helenus/core/SessionInitializer.java +++ b/src/main/java/net/helenus/core/SessionInitializer.java @@ -19,6 +19,13 @@ import brave.Tracer; import com.codahale.metrics.MetricRegistry; import com.datastax.driver.core.*; import com.google.common.util.concurrent.MoreExecutors; +import java.io.IOException; +import java.io.PrintStream; +import java.util.*; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.function.Consumer; +import javax.cache.CacheManager; import net.helenus.core.reflect.DslExportable; import net.helenus.mapping.HelenusEntity; import net.helenus.mapping.HelenusEntityType; @@ -29,14 +36,6 @@ import net.helenus.support.Either; import net.helenus.support.HelenusException; import net.helenus.support.PackageUtil; -import javax.cache.CacheManager; -import java.io.IOException; -import java.io.PrintStream; -import java.util.*; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.function.Consumer; - public final class SessionInitializer extends AbstractSessionOperations { private final Session session; diff --git a/src/main/java/net/helenus/core/cache/MapCache.java b/src/main/java/net/helenus/core/cache/MapCache.java new file mode 100644 index 0000000..3082081 --- /dev/null +++ b/src/main/java/net/helenus/core/cache/MapCache.java @@ -0,0 +1,155 @@ +package net.helenus.core.cache; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.configuration.CacheEntryListenerConfiguration; +import javax.cache.configuration.Configuration; +import javax.cache.integration.CompletionListener; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; + +public class MapCache implements Cache { + + private Map map = new HashMap(); + + @Override + public V get(K key) { + return map.get(key); + } + + @Override + public Map getAll(Set keys) { + Map result = new HashMap(keys.size()); + for (K key : keys) { + V value = map.get(key); + if (value != null) { + result.put(key, value); + } + } + return result; + } + + @Override + public boolean containsKey(K key) { + return map.containsKey(key); + } + + @Override + public void loadAll( + Set keys, + boolean replaceExistingValues, + CompletionListener completionListener) {} + + @Override + public void put(K key, V value) {} + + @Override + public V getAndPut(K key, V value) { + return null; + } + + @Override + public void putAll(Map map) {} + + @Override + public boolean putIfAbsent(K key, V value) { + return false; + } + + @Override + public boolean remove(K key) { + return false; + } + + @Override + public boolean remove(K key, V oldValue) { + return false; + } + + @Override + public V getAndRemove(K key) { + return null; + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + return false; + } + + @Override + public boolean replace(K key, V value) { + return false; + } + + @Override + public V getAndReplace(K key, V value) { + return null; + } + + @Override + public void removeAll(Set keys) {} + + @Override + public void removeAll() {} + + @Override + public void clear() {} + + @Override + public > C getConfiguration(Class clazz) { + return null; + } + + @Override + public T invoke(K key, EntryProcessor entryProcessor, Object... arguments) + throws EntryProcessorException { + return null; + } + + @Override + public Map> invokeAll( + Set keys, EntryProcessor entryProcessor, Object... arguments) { + return null; + } + + @Override + public String getName() { + return null; + } + + @Override + public CacheManager getCacheManager() { + return null; + } + + @Override + public void close() {} + + @Override + public boolean isClosed() { + return false; + } + + @Override + public T unwrap(Class clazz) { + return null; + } + + @Override + public void registerCacheEntryListener( + CacheEntryListenerConfiguration cacheEntryListenerConfiguration) {} + + @Override + public void deregisterCacheEntryListener( + CacheEntryListenerConfiguration cacheEntryListenerConfiguration) {} + + @Override + public Iterator> iterator() { + return null; + } +} diff --git a/src/main/java/net/helenus/core/operation/BatchOperation.java b/src/main/java/net/helenus/core/operation/BatchOperation.java index 35288bc..a0662ee 100644 --- a/src/main/java/net/helenus/core/operation/BatchOperation.java +++ b/src/main/java/net/helenus/core/operation/BatchOperation.java @@ -16,10 +16,10 @@ package net.helenus.core.operation; import com.codahale.metrics.Timer; +import com.datastax.driver.core.AtomicMonotonicTimestampGenerator; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.TimestampGenerator; -import com.datastax.driver.core.AtomicMonotonicTimestampGenerator; import com.google.common.base.Stopwatch; import java.util.ArrayList; import java.util.List; @@ -31,7 +31,8 @@ import net.helenus.support.HelenusException; public class BatchOperation extends Operation { //TODO(gburd): find the way to get the driver's timestamp generator - private static final TimestampGenerator timestampGenerator = new AtomicMonotonicTimestampGenerator(); + private static final TimestampGenerator timestampGenerator = + new AtomicMonotonicTimestampGenerator(); private final BatchStatement batch; private List> operations = new ArrayList>(); diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java index b548d02..a9b5fb7 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java @@ -17,15 +17,16 @@ package net.helenus.test.integration.core.unitofwork; import static net.helenus.core.Query.eq; +import ca.exprofesso.guava.jcache.GuavaCachingProvider; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.utils.UUIDs; import java.io.Serializable; import java.util.Date; import java.util.UUID; - import javax.cache.CacheManager; +import javax.cache.Caching; import javax.cache.configuration.MutableConfiguration; - +import javax.cache.spi.CachingProvider; import net.bytebuddy.utility.RandomString; import net.helenus.core.Helenus; import net.helenus.core.HelenusSession; @@ -71,6 +72,14 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { @BeforeClass public static void beforeTest() { + CachingProvider cachingProvider = + Caching.getCachingProvider(GuavaCachingProvider.class.getName()); + CacheManager cacheManager = cachingProvider.getCacheManager(); + MutableConfiguration configuration = new MutableConfiguration<>(); + configuration.setStoreByValue(false).setReadThrough(false); + cacheManager.createCache( + MappingUtil.getTableName(Widget.class, true).toString(), configuration); + session = Helenus.init(getSession()) .showCql() @@ -78,15 +87,9 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .autoCreateDrop() .consistencyLevel(ConsistencyLevel.ONE) .idempotentQueryExecution(true) + .setCacheManager(cacheManager) .get(); widget = session.dsl(Widget.class); - - MutableConfiguration configuration = new MutableConfiguration<>(); - configuration - .setStoreByValue(false) - .setReadThrough(false); - CacheManager cacheManager = session.getCacheManager(); - cacheManager.createCache(MappingUtil.getTableName(Widget.class, true).toString(), configuration); } @Test @@ -469,8 +472,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .value(widget::name, RandomString.make(20)) .sync(uow); - String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key1.toString(); - uow.cacheUpdate(cacheKey, w1); + String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key1.toString(); + uow.cacheUpdate(cacheKey, w1); /* w2 = session.upsert(w1) .value(widget::a, RandomString.make(10))