Integrate JCache for cached objects outside of a UnitOfWork.

This commit is contained in:
Greg Burd 2018-02-07 18:41:39 -05:00
parent 5215749de1
commit 6858cf6f48
5 changed files with 112 additions and 184 deletions

19
pom.xml
View file

@ -112,6 +112,12 @@
<version>3.3.2</version> <version>3.3.2</version>
</dependency> </dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.3.2</version>
</dependency>
<dependency> <dependency>
<groupId>com.diffplug.durian</groupId> <groupId>com.diffplug.durian</groupId>
<artifactId>durian</artifactId> <artifactId>durian</artifactId>
@ -136,12 +142,24 @@
<version>4.3.10.RELEASE</version> <version>4.3.10.RELEASE</version>
</dependency> </dependency>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>1.1.0</version>
</dependency>
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>20.0</version> <version>20.0</version>
</dependency> </dependency>
<dependency>
<groupId>ca.exprofesso</groupId>
<artifactId>guava-jcache</artifactId>
<version>1.0.4</version>
</dependency>
<!-- Metrics and tracing --> <!-- Metrics and tracing -->
<dependency> <dependency>
<groupId>io.zipkin.java</groupId> <groupId>io.zipkin.java</groupId>
@ -259,7 +277,6 @@
<version>1.7.1</version> <version>1.7.1</version>
<scope>runtime</scope> <scope>runtime</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>

View file

@ -15,25 +15,12 @@
*/ */
package net.helenus.core; package net.helenus.core;
import static net.helenus.core.Query.eq;
import brave.Tracer; import brave.Tracer;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*; import com.datastax.driver.core.*;
import com.google.common.collect.Table; import com.google.common.collect.Table;
import java.io.Closeable;
import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet; import net.helenus.core.cache.Facet;
import net.helenus.core.cache.SessionCache;
import net.helenus.core.cache.UnboundFacet; import net.helenus.core.cache.UnboundFacet;
import net.helenus.core.operation.*; import net.helenus.core.operation.*;
import net.helenus.core.reflect.Drafted; import net.helenus.core.reflect.Drafted;
@ -50,6 +37,24 @@ import net.helenus.support.Fun.Tuple6;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;
import java.io.Closeable;
import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static net.helenus.core.Query.eq;
public class HelenusSession extends AbstractSessionOperations implements Closeable { public class HelenusSession extends AbstractSessionOperations implements Closeable {
public static final Object deleted = new Object(); public static final Object deleted = new Object();
@ -68,7 +73,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
private final SessionRepository sessionRepository; private final SessionRepository sessionRepository;
private final Executor executor; private final Executor executor;
private final boolean dropSchemaOnClose; private final boolean dropSchemaOnClose;
private final SessionCache<String, Object> sessionCache; private final CacheManager cacheManager;
private final RowColumnValueProvider valueProvider; private final RowColumnValueProvider valueProvider;
private final StatementColumnValuePreparer valuePreparer; private final StatementColumnValuePreparer valuePreparer;
private final Metadata metadata; private final Metadata metadata;
@ -77,21 +82,21 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
private volatile boolean showValues; private volatile boolean showValues;
HelenusSession( HelenusSession(
Session session, Session session,
String usingKeyspace, String usingKeyspace,
CodecRegistry registry, CodecRegistry registry,
boolean showCql, boolean showCql,
boolean showValues, boolean showValues,
PrintStream printStream, PrintStream printStream,
SessionRepositoryBuilder sessionRepositoryBuilder, SessionRepositoryBuilder sessionRepositoryBuilder,
Executor executor, Executor executor,
boolean dropSchemaOnClose, boolean dropSchemaOnClose,
ConsistencyLevel consistencyLevel, ConsistencyLevel consistencyLevel,
boolean defaultQueryIdempotency, boolean defaultQueryIdempotency,
Class<? extends UnitOfWork> unitOfWorkClass, Class<? extends UnitOfWork> unitOfWorkClass,
SessionCache sessionCache, CacheManager cacheManager,
MetricRegistry metricRegistry, MetricRegistry metricRegistry,
Tracer tracer) { Tracer tracer) {
this.session = session; this.session = session;
this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry; this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry;
this.usingKeyspace = this.usingKeyspace =
@ -109,11 +114,14 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
this.unitOfWorkClass = unitOfWorkClass; this.unitOfWorkClass = unitOfWorkClass;
this.metricRegistry = metricRegistry; this.metricRegistry = metricRegistry;
this.zipkinTracer = tracer; this.zipkinTracer = tracer;
this.cacheManager = cacheManger;
if (sessionCache == null) { if (cacheManager == null) {
this.sessionCache = SessionCache.<String, Object>defaultCache(); MutableConfiguration<String, Object> configuration = new MutableConfiguration<>();
} else { configuration.setStoreByValue(false);
this.sessionCache = sessionCache; configuration.setTypes(String.class, Object.class);
CachingProvider cachingProvider = Caching.getCachingProvider(GuavaCacheManager.class.getName());
cacheManager = cachingProvider.getCacheManager();
} }
this.valueProvider = new RowColumnValueProvider(this.sessionRepository); this.valueProvider = new RowColumnValueProvider(this.sessionRepository);
@ -214,10 +222,13 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
@Override @Override
public Object checkCache(String tableName, List<Facet> facets) { public Object checkCache(String tableName, List<Facet> facets) {
Object result = null; Object result = null;
for (String key : CacheUtil.flatKeys(tableName, facets)) { Cache<String, Object> cache = cacheManager.getCache(tableName);
result = sessionCache.get(key); if (cache != null) {
if (result != null) { for (String key : CacheUtil.flatKeys(tableName, facets)) {
return result; result = cache.get(key);
if (result != null) {
return result;
}
} }
} }
return null; return null;
@ -226,7 +237,10 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
@Override @Override
public void cacheEvict(List<Facet> facets) { public void cacheEvict(List<Facet> facets) {
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
CacheUtil.flatKeys(tableName, facets).forEach(key -> sessionCache.invalidate(key)); Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
CacheUtil.flatKeys(tableName, facets).forEach(key -> cache.remove(key));
}
} }
@Override @Override
@ -278,6 +292,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
if (entity.isCacheable()) { if (entity.isCacheable()) {
List<Facet> boundFacets = new ArrayList<>(); List<Facet> boundFacets = new ArrayList<>();
String tableName = CacheUtil.schemaName(boundFacets);
for (Facet facet : entity.getFacets()) { for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) { if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet; UnboundFacet unboundFacet = (UnboundFacet) facet;
@ -305,34 +320,43 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
} }
} }
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets); List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
String tableName = CacheUtil.schemaName(boundFacets);
replaceCachedFacetValues(pojo, tableName, facetCombinations); replaceCachedFacetValues(pojo, tableName, facetCombinations);
} }
} }
List<List<Facet>> deletedFacetSets = if (cacheManager != null) {
uowCache List<List<Facet>> deletedFacetSets =
.values() uowCache
.stream() .values()
.filter(Either::isRight) .stream()
.map(Either::getRight) .filter(Either::isRight)
.collect(Collectors.toList()); .map(Either::getRight)
for (List<Facet> facets : deletedFacetSets) { .collect(Collectors.toList());
String tableName = CacheUtil.schemaName(facets); for (List<Facet> facets : deletedFacetSets) {
List<String> keys = CacheUtil.flatKeys(tableName, facets); String tableName = CacheUtil.schemaName(facets);
keys.forEach(key -> sessionCache.invalidate(key)); Cache<String, Object> cache = cacheManager.getCache(tableName);
} if (cache != null) {
List<String> keys = CacheUtil.flatKeys(tableName, facets);
keys.forEach(key -> cache.remove(key));
}
}
}
} }
private void replaceCachedFacetValues( private void replaceCachedFacetValues(
Object pojo, String tableName, List<String[]> facetCombinations) { Object pojo, String tableName, List<String[]> facetCombinations) {
for (String[] combination : facetCombinations) { for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination); String cacheKey = tableName + "." + Arrays.toString(combination);
if (pojo == null || pojo == HelenusSession.deleted) { if (cacheManager != null) {
sessionCache.invalidate(cacheKey); Cache<String, Object> cache = cacheManager.getCache(tableName);
} else { if (cache != null) {
sessionCache.put(cacheKey, pojo); if (pojo == null || pojo == HelenusSession.deleted) {
} cache.remove(cacheKey);
} else {
cache.put(cacheKey, pojo);
}
}
}
} }
} }

View file

@ -17,26 +17,8 @@ package net.helenus.core;
import brave.Tracer; import brave.Tracer;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.CodecRegistry; import com.datastax.driver.core.*;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.UserType;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import net.helenus.core.cache.SessionCache;
import net.helenus.core.reflect.DslExportable; import net.helenus.core.reflect.DslExportable;
import net.helenus.mapping.HelenusEntity; import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusEntityType; import net.helenus.mapping.HelenusEntityType;
@ -47,6 +29,14 @@ import net.helenus.support.Either;
import net.helenus.support.HelenusException; import net.helenus.support.HelenusException;
import net.helenus.support.PackageUtil; import net.helenus.support.PackageUtil;
import javax.cache.CacheManager;
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
public final class SessionInitializer extends AbstractSessionOperations { public final class SessionInitializer extends AbstractSessionOperations {
private final Session session; private final Session session;
@ -67,7 +57,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
private boolean dropUnusedIndexes = false; private boolean dropUnusedIndexes = false;
private KeyspaceMetadata keyspaceMetadata; private KeyspaceMetadata keyspaceMetadata;
private AutoDdl autoDdl = AutoDdl.UPDATE; private AutoDdl autoDdl = AutoDdl.UPDATE;
private SessionCache sessionCache = null; private CacheManager cacheManager = null;
SessionInitializer(Session session, String keyspace) { SessionInitializer(Session session, String keyspace) {
this.session = session; this.session = session;
@ -157,8 +147,8 @@ public final class SessionInitializer extends AbstractSessionOperations {
return this; return this;
} }
public SessionInitializer setSessionCache(SessionCache sessionCache) { public SessionInitializer setCacheManager(CacheManager cacheManager) {
this.sessionCache = sessionCache; this.cacheManager = cacheManager;
return this; return this;
} }
@ -304,7 +294,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
consistencyLevel, consistencyLevel,
idempotent, idempotent,
unitOfWorkClass, unitOfWorkClass,
sessionCache, cacheManager,
metricRegistry, metricRegistry,
zipkinTracer); zipkinTracer);
} }

View file

@ -1,43 +0,0 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.cache;
import com.google.common.cache.Cache;
public class GuavaCache<K, V> implements SessionCache<K, V> {
final Cache<K, V> cache;
GuavaCache(Cache<K, V> cache) {
this.cache = cache;
}
@Override
public void invalidate(K key) {
cache.invalidate(key);
}
@Override
public V get(K key) {
return cache.getIfPresent(key);
}
@Override
public void put(K key, V value) {
cache.put(key, value);
}
}

View file

@ -1,60 +0,0 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface SessionCache<K, V> {
static final Logger LOG = LoggerFactory.getLogger(SessionCache.class);
static <K, V> SessionCache<K, V> defaultCache() {
GuavaCache<K, V> cache;
RemovalListener<K, V> listener =
new RemovalListener<K, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> n) {
if (n.wasEvicted()) {
String cause = n.getCause().name();
LOG.info(cause);
}
}
};
cache =
new GuavaCache<K, V>(
CacheBuilder.newBuilder()
.maximumSize(25_000)
.expireAfterAccess(5, TimeUnit.MINUTES)
.softValues()
.removalListener(listener)
.build());
return cache;
}
void invalidate(K key);
V get(K key);
void put(K key, V value);
}