Move the Guava JCache provider into the test targets only, don't assume a CacheManager instance exists.

This commit is contained in:
Greg Burd 2018-02-08 14:46:31 -05:00
parent d69d8a3b1e
commit 76b603f3d3
8 changed files with 330 additions and 181 deletions

View file

@ -36,7 +36,6 @@
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
<orderEntry type="library" name="Maven: javax.cache:cache-api:1.1.0" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:20.0" level="project" />
<orderEntry type="library" name="Maven: ca.exprofesso:guava-jcache:1.0.4" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.java:zipkin:1.29.2" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.brave:brave:4.0.6" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.reporter:zipkin-reporter:0.6.12" level="project" />
@ -118,6 +117,7 @@
<orderEntry type="library" scope="TEST" name="Maven: org.caffinitas.ohc:ohc-core:0.4.4" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.github.ben-manes.caffeine:caffeine:2.2.6" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.jctools:jctools-core:1.2.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: ca.exprofesso:guava-jcache:1.0.4" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: commons-io:commons-io:2.5" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-library:1.3" level="project" />

34
pom.xml
View file

@ -154,22 +154,6 @@
<version>20.0</version>
</dependency>
<dependency>
<groupId>ca.exprofesso</groupId>
<artifactId>guava-jcache</artifactId>
<version>1.0.4</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Metrics and tracing -->
<dependency>
<groupId>io.zipkin.java</groupId>
@ -239,6 +223,24 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>ca.exprofesso</groupId>
<artifactId>guava-jcache</artifactId>
<version>1.0.4</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>

View file

@ -28,10 +28,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheManager;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.core.operation.AbstractOperation;
@ -320,7 +318,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
if (facet.alone()) {
String columnName = facet.name() + "==" + facet.value();
if (result == null) result = cache.get(tableName, columnName);
cache.put(tableName, columnName, Either.left(value));
cache.put(tableName, columnName, Either.left(value));
}
}
}
@ -400,29 +398,29 @@ public abstract class AbstractUnitOfWork<E extends Exception>
});
// Merge our statement cache into the session cache if it exists.
CacheManager cacheManager = session.getCacheManager();
if (cacheManager != null) {
for (Map.Entry<String, Object> entry : statementCache.entrySet()) {
String[] keyParts = entry.getKey().split("\\.");
if (keyParts.length == 2) {
String cacheName = keyParts[0];
String key = keyParts[1];
if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) {
Cache<Object, Object> cache = cacheManager.getCache(cacheName);
if (cache != null) {
Object value = entry.getValue();
if (value == deleted) {
cache.remove(key);
} else {
cache.put(key.toString(), value);
}
}
}
CacheManager cacheManager = session.getCacheManager();
if (cacheManager != null) {
for (Map.Entry<String, Object> entry : statementCache.entrySet()) {
String[] keyParts = entry.getKey().split("\\.");
if (keyParts.length == 2) {
String cacheName = keyParts[0];
String key = keyParts[1];
if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) {
Cache<Object, Object> cache = cacheManager.getCache(cacheName);
if (cache != null) {
Object value = entry.getValue();
if (value == deleted) {
cache.remove(key);
} else {
cache.put(key.toString(), value);
}
}
}
}
}
}
// Merge our cache into the session cache.
// Merge our cache into the session cache.
session.mergeCache(cache);
// Spoil any lingering futures that may be out there.

View file

@ -15,11 +15,24 @@
*/
package net.helenus.core;
import static net.helenus.core.Query.eq;
import brave.Tracer;
import ca.exprofesso.guava.jcache.GuavaCachingProvider;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*;
import com.google.common.collect.Table;
import java.io.Closeable;
import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheManager;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.core.cache.UnboundFacet;
@ -38,23 +51,6 @@ import net.helenus.support.Fun.Tuple6;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.spi.CachingProvider;
import java.io.Closeable;
import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static net.helenus.core.Query.eq;
public class HelenusSession extends AbstractSessionOperations implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(HelenusSession.class);
@ -82,21 +78,21 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
private volatile boolean showValues;
HelenusSession(
Session session,
String usingKeyspace,
CodecRegistry registry,
boolean showCql,
boolean showValues,
PrintStream printStream,
SessionRepositoryBuilder sessionRepositoryBuilder,
Executor executor,
boolean dropSchemaOnClose,
ConsistencyLevel consistencyLevel,
boolean defaultQueryIdempotency,
Class<? extends UnitOfWork> unitOfWorkClass,
CacheManager cacheManager,
MetricRegistry metricRegistry,
Tracer tracer) {
Session session,
String usingKeyspace,
CodecRegistry registry,
boolean showCql,
boolean showValues,
PrintStream printStream,
SessionRepositoryBuilder sessionRepositoryBuilder,
Executor executor,
boolean dropSchemaOnClose,
ConsistencyLevel consistencyLevel,
boolean defaultQueryIdempotency,
Class<? extends UnitOfWork> unitOfWorkClass,
CacheManager cacheManager,
MetricRegistry metricRegistry,
Tracer tracer) {
this.session = session;
this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry;
this.usingKeyspace =
@ -114,13 +110,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
this.unitOfWorkClass = unitOfWorkClass;
this.metricRegistry = metricRegistry;
this.zipkinTracer = tracer;
if (cacheManager == null) {
CachingProvider cachingProvider = Caching.getCachingProvider(GuavaCachingProvider.class.getName());
this.cacheManager = cachingProvider.getCacheManager();
} else {
this.cacheManager = cacheManager;
}
this.cacheManager = cacheManager;
this.valueProvider = new RowColumnValueProvider(this.sessionRepository);
this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository);
@ -220,12 +210,14 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
@Override
public Object checkCache(String tableName, List<Facet> facets) {
Object result = null;
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
for (String key : CacheUtil.flatKeys(tableName, facets)) {
result = cache.get(key);
if (result != null) {
return result;
if (cacheManager != null) {
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
for (String key : CacheUtil.flatKeys(tableName, facets)) {
result = cache.get(key);
if (result != null) {
return result;
}
}
}
}
@ -234,10 +226,12 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
@Override
public void cacheEvict(List<Facet> facets) {
String tableName = CacheUtil.schemaName(facets);
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
CacheUtil.flatKeys(tableName, facets).forEach(key -> cache.remove(key));
if (cacheManager != null) {
String tableName = CacheUtil.schemaName(facets);
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
CacheUtil.flatKeys(tableName, facets).forEach(key -> cache.remove(key));
}
}
}
@ -276,93 +270,90 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
@Override
public void mergeCache(Table<String, String, Either<Object, List<Facet>>> uowCache) {
if (cacheManager == null) {
return;
}
List<Object> items =
uowCache
.values()
.stream()
.filter(Either::isLeft)
.map(Either::getLeft)
.distinct()
.collect(Collectors.toList());
for (Object pojo : items) {
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo));
Map<String, Object> valueMap =
pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
if (entity.isCacheable()) {
List<Facet> boundFacets = new ArrayList<>();
String tableName = CacheUtil.schemaName(boundFacets);
for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet
.getProperties()
.forEach(
prop -> {
if (valueMap == null) {
Object value =
BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop);
binder.setValueForProperty(prop, value.toString());
} else {
Object v = valueMap.get(prop.getPropertyName());
if (v != null) {
binder.setValueForProperty(prop, v.toString());
if (cacheManager != null) {
List<Object> items =
uowCache
.values()
.stream()
.filter(Either::isLeft)
.map(Either::getLeft)
.distinct()
.collect(Collectors.toList());
for (Object pojo : items) {
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo));
Map<String, Object> valueMap =
pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
if (entity.isCacheable()) {
List<Facet> boundFacets = new ArrayList<>();
String tableName = CacheUtil.schemaName(boundFacets);
for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet
.getProperties()
.forEach(
prop -> {
if (valueMap == null) {
Object value =
BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop);
binder.setValueForProperty(prop, value.toString());
} else {
Object v = valueMap.get(prop.getPropertyName());
if (v != null) {
binder.setValueForProperty(prop, v.toString());
}
}
}
});
if (binder.isBound()) {
boundFacets.add(binder.bind());
});
if (binder.isBound()) {
boundFacets.add(binder.bind());
}
} else {
boundFacets.add(facet);
}
} else {
boundFacets.add(facet);
}
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
replaceCachedFacetValues(pojo, tableName, facetCombinations);
}
}
List<List<Facet>> deletedFacetSets =
uowCache
.values()
.stream()
.filter(Either::isRight)
.map(Either::getRight)
.collect(Collectors.toList());
for (List<Facet> facets : deletedFacetSets) {
String tableName = CacheUtil.schemaName(facets);
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
List<String> keys = CacheUtil.flatKeys(tableName, facets);
keys.forEach(key -> cache.remove(key));
}
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
replaceCachedFacetValues(pojo, tableName, facetCombinations);
}
}
if (cacheManager != null) {
List<List<Facet>> deletedFacetSets =
uowCache
.values()
.stream()
.filter(Either::isRight)
.map(Either::getRight)
.collect(Collectors.toList());
for (List<Facet> facets : deletedFacetSets) {
String tableName = CacheUtil.schemaName(facets);
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
List<String> keys = CacheUtil.flatKeys(tableName, facets);
keys.forEach(key -> cache.remove(key));
}
}
}
}
private void replaceCachedFacetValues(
Object pojo, String tableName, List<String[]> facetCombinations) {
for (String[] combination : facetCombinations) {
if (cacheManager != null) {
for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination);
if (cacheManager != null) {
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
if (pojo == null || pojo == HelenusSession.deleted) {
cache.remove(cacheKey);
} else {
cache.put(cacheKey, pojo);
}
}
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
if (pojo == null || pojo == HelenusSession.deleted) {
cache.remove(cacheKey);
} else {
cache.put(cacheKey, pojo);
}
}
}
}
}
public CacheManager getCacheManager() {
return cacheManager;
return cacheManager;
}
public Metadata getMetadata() {

View file

@ -19,6 +19,13 @@ import brave.Tracer;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import javax.cache.CacheManager;
import net.helenus.core.reflect.DslExportable;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusEntityType;
@ -29,14 +36,6 @@ import net.helenus.support.Either;
import net.helenus.support.HelenusException;
import net.helenus.support.PackageUtil;
import javax.cache.CacheManager;
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
public final class SessionInitializer extends AbstractSessionOperations {
private final Session session;

View file

@ -0,0 +1,155 @@
package net.helenus.core.cache;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Configuration;
import javax.cache.integration.CompletionListener;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
public class MapCache<K, V> implements Cache<K, V> {
private Map<K, V> map = new HashMap<K, V>();
@Override
public V get(K key) {
return map.get(key);
}
@Override
public Map<K, V> getAll(Set<? extends K> keys) {
Map<K, V> result = new HashMap<K, V>(keys.size());
for (K key : keys) {
V value = map.get(key);
if (value != null) {
result.put(key, value);
}
}
return result;
}
@Override
public boolean containsKey(K key) {
return map.containsKey(key);
}
@Override
public void loadAll(
Set<? extends K> keys,
boolean replaceExistingValues,
CompletionListener completionListener) {}
@Override
public void put(K key, V value) {}
@Override
public V getAndPut(K key, V value) {
return null;
}
@Override
public void putAll(Map<? extends K, ? extends V> map) {}
@Override
public boolean putIfAbsent(K key, V value) {
return false;
}
@Override
public boolean remove(K key) {
return false;
}
@Override
public boolean remove(K key, V oldValue) {
return false;
}
@Override
public V getAndRemove(K key) {
return null;
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
return false;
}
@Override
public boolean replace(K key, V value) {
return false;
}
@Override
public V getAndReplace(K key, V value) {
return null;
}
@Override
public void removeAll(Set<? extends K> keys) {}
@Override
public void removeAll() {}
@Override
public void clear() {}
@Override
public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
return null;
}
@Override
public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments)
throws EntryProcessorException {
return null;
}
@Override
public <T> Map<K, EntryProcessorResult<T>> invokeAll(
Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... arguments) {
return null;
}
@Override
public String getName() {
return null;
}
@Override
public CacheManager getCacheManager() {
return null;
}
@Override
public void close() {}
@Override
public boolean isClosed() {
return false;
}
@Override
public <T> T unwrap(Class<T> clazz) {
return null;
}
@Override
public void registerCacheEntryListener(
CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {}
@Override
public void deregisterCacheEntryListener(
CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {}
@Override
public Iterator<Entry<K, V>> iterator() {
return null;
}
}

View file

@ -16,10 +16,10 @@
package net.helenus.core.operation;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.AtomicMonotonicTimestampGenerator;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.TimestampGenerator;
import com.datastax.driver.core.AtomicMonotonicTimestampGenerator;
import com.google.common.base.Stopwatch;
import java.util.ArrayList;
import java.util.List;
@ -31,7 +31,8 @@ import net.helenus.support.HelenusException;
public class BatchOperation extends Operation<Long> {
//TODO(gburd): find the way to get the driver's timestamp generator
private static final TimestampGenerator timestampGenerator = new AtomicMonotonicTimestampGenerator();
private static final TimestampGenerator timestampGenerator =
new AtomicMonotonicTimestampGenerator();
private final BatchStatement batch;
private List<AbstractOperation<?, ?>> operations = new ArrayList<AbstractOperation<?, ?>>();

View file

@ -17,15 +17,16 @@ package net.helenus.test.integration.core.unitofwork;
import static net.helenus.core.Query.eq;
import ca.exprofesso.guava.jcache.GuavaCachingProvider;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.utils.UUIDs;
import java.io.Serializable;
import java.util.Date;
import java.util.UUID;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;
import net.bytebuddy.utility.RandomString;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
@ -71,6 +72,14 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
@BeforeClass
public static void beforeTest() {
CachingProvider cachingProvider =
Caching.getCachingProvider(GuavaCachingProvider.class.getName());
CacheManager cacheManager = cachingProvider.getCacheManager();
MutableConfiguration<String, Object> configuration = new MutableConfiguration<>();
configuration.setStoreByValue(false).setReadThrough(false);
cacheManager.createCache(
MappingUtil.getTableName(Widget.class, true).toString(), configuration);
session =
Helenus.init(getSession())
.showCql()
@ -78,15 +87,9 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.autoCreateDrop()
.consistencyLevel(ConsistencyLevel.ONE)
.idempotentQueryExecution(true)
.setCacheManager(cacheManager)
.get();
widget = session.dsl(Widget.class);
MutableConfiguration<String, Object> configuration = new MutableConfiguration<>();
configuration
.setStoreByValue(false)
.setReadThrough(false);
CacheManager cacheManager = session.getCacheManager();
cacheManager.createCache(MappingUtil.getTableName(Widget.class, true).toString(), configuration);
}
@Test
@ -469,8 +472,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.value(widget::name, RandomString.make(20))
.sync(uow);
String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key1.toString();
uow.cacheUpdate(cacheKey, w1);
String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key1.toString();
uow.cacheUpdate(cacheKey, w1);
/*
w2 = session.<Widget>upsert(w1)
.value(widget::a, RandomString.make(10))