Finish first steps of JCache integration, UnitOfWork statement cache now merges into available JCache at commit.

This commit is contained in:
Greg Burd 2018-02-08 10:09:23 -05:00
parent 6858cf6f48
commit d69d8a3b1e
6 changed files with 102 additions and 27 deletions

8
NOTES
View file

@ -22,6 +22,14 @@ Operation/
`-- PreparedStreamOperation
----
@CompoundIndex()
create a new col in the same table called __idx_a_b_c that the hash of the concatenated values in that order is stored, create a normal index for that (CREATE INDEX ...)
if a query matches that set of columns then use that indexed col to fetch the desired results from that table
could also work with .in() query if materialized view exists
----
// TODO(gburd): create a statement that matches one that wasn't prepared
//String key =
// "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString();

View file

@ -28,12 +28,15 @@
<orderEntry type="library" name="Maven: com.github.jnr:jnr-x86asm:1.0.2" level="project" />
<orderEntry type="library" name="Maven: com.github.jnr:jnr-posix:3.0.27" level="project" />
<orderEntry type="library" name="Maven: com.github.jnr:jnr-constants:0.9.0" level="project" />
<orderEntry type="library" name="Maven: com.datastax.cassandra:cassandra-driver-extras:3.3.2" level="project" />
<orderEntry type="library" name="Maven: com.diffplug.durian:durian:3.4.0" level="project" />
<orderEntry type="library" name="Maven: org.aspectj:aspectjweaver:1.8.10" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.6" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-core:4.3.10.RELEASE" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
<orderEntry type="library" name="Maven: javax.cache:cache-api:1.1.0" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:20.0" level="project" />
<orderEntry type="library" name="Maven: ca.exprofesso:guava-jcache:1.0.4" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.java:zipkin:1.29.2" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.brave:brave:4.0.6" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.reporter:zipkin-reporter:0.6.12" level="project" />
@ -117,7 +120,6 @@
<orderEntry type="library" scope="TEST" name="Maven: org.jctools:jctools-core:1.2.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: commons-io:commons-io:2.5" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.github.stephenc:jamm:0.2.5" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-library:1.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.mockito:mockito-core:2.8.47" level="project" />

10
pom.xml
View file

@ -158,6 +158,16 @@
<groupId>ca.exprofesso</groupId>
<artifactId>guava-jcache</artifactId>
<version>1.0.4</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Metrics and tracing -->

View file

@ -28,6 +28,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheManager;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.core.operation.AbstractOperation;
@ -36,6 +40,7 @@ import net.helenus.mapping.MappingUtil;
import net.helenus.support.Either;
import net.helenus.support.HelenusException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -262,7 +267,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
@Override
public void cacheDelete(String key) {
statementCache.replace(key, deleted);
statementCache.put(key, deleted);
}
@Override
@ -303,7 +308,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
@Override
public Object cacheUpdate(String key, Object value) {
return statementCache.replace(key, value);
return statementCache.put(key, value);
}
@Override
@ -315,7 +320,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
if (facet.alone()) {
String columnName = facet.name() + "==" + facet.value();
if (result == null) result = cache.get(tableName, columnName);
cache.put(tableName, columnName, Either.left(value));
cache.put(tableName, columnName, Either.left(value));
}
}
}
@ -394,7 +399,30 @@ public abstract class AbstractUnitOfWork<E extends Exception>
applyPostCommitFunctions("committed", uow.commitThunks);
});
// Merge our cache into the session cache.
// Merge our statement cache into the session cache if it exists.
CacheManager cacheManager = session.getCacheManager();
if (cacheManager != null) {
for (Map.Entry<String, Object> entry : statementCache.entrySet()) {
String[] keyParts = entry.getKey().split("\\.");
if (keyParts.length == 2) {
String cacheName = keyParts[0];
String key = keyParts[1];
if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) {
Cache<Object, Object> cache = cacheManager.getCache(cacheName);
if (cache != null) {
Object value = entry.getValue();
if (value == deleted) {
cache.remove(key);
} else {
cache.put(key.toString(), value);
}
}
}
}
}
}
// Merge our cache into the session cache.
session.mergeCache(cache);
// Spoil any lingering futures that may be out there.

View file

@ -16,6 +16,7 @@
package net.helenus.core;
import brave.Tracer;
import ca.exprofesso.guava.jcache.GuavaCachingProvider;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*;
import com.google.common.collect.Table;
@ -40,7 +41,6 @@ import org.slf4j.LoggerFactory;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;
import java.io.Closeable;
import java.io.PrintStream;
@ -57,8 +57,8 @@ import static net.helenus.core.Query.eq;
public class HelenusSession extends AbstractSessionOperations implements Closeable {
public static final Object deleted = new Object();
private static final Logger LOG = LoggerFactory.getLogger(HelenusSession.class);
public static final Object deleted = new Object();
private static final Pattern classNameRegex =
Pattern.compile("^(?:\\w+\\.)+(?:(\\w+)|(\\w+)\\$.*)$");
@ -114,14 +114,12 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
this.unitOfWorkClass = unitOfWorkClass;
this.metricRegistry = metricRegistry;
this.zipkinTracer = tracer;
this.cacheManager = cacheManger;
if (cacheManager == null) {
MutableConfiguration<String, Object> configuration = new MutableConfiguration<>();
configuration.setStoreByValue(false);
configuration.setTypes(String.class, Object.class);
CachingProvider cachingProvider = Caching.getCachingProvider(GuavaCacheManager.class.getName());
cacheManager = cachingProvider.getCacheManager();
CachingProvider cachingProvider = Caching.getCachingProvider(GuavaCachingProvider.class.getName());
this.cacheManager = cachingProvider.getCacheManager();
} else {
this.cacheManager = cacheManager;
}
this.valueProvider = new RowColumnValueProvider(this.sessionRepository);
@ -278,6 +276,9 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
@Override
public void mergeCache(Table<String, String, Either<Object, List<Facet>>> uowCache) {
if (cacheManager == null) {
return;
}
List<Object> items =
uowCache
.values()
@ -325,22 +326,22 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
}
if (cacheManager != null) {
List<List<Facet>> deletedFacetSets =
uowCache
.values()
.stream()
.filter(Either::isRight)
.map(Either::getRight)
.collect(Collectors.toList());
for (List<Facet> facets : deletedFacetSets) {
String tableName = CacheUtil.schemaName(facets);
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
List<String> keys = CacheUtil.flatKeys(tableName, facets);
keys.forEach(key -> cache.remove(key));
}
List<List<Facet>> deletedFacetSets =
uowCache
.values()
.stream()
.filter(Either::isRight)
.map(Either::getRight)
.collect(Collectors.toList());
for (List<Facet> facets : deletedFacetSets) {
String tableName = CacheUtil.schemaName(facets);
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
List<String> keys = CacheUtil.flatKeys(tableName, facets);
keys.forEach(key -> cache.remove(key));
}
}
}
}
private void replaceCachedFacetValues(
@ -360,6 +361,10 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
}
}
public CacheManager getCacheManager() {
return cacheManager;
}
public Metadata getMetadata() {
return metadata;
}

View file

@ -22,12 +22,17 @@ import com.datastax.driver.core.utils.UUIDs;
import java.io.Serializable;
import java.util.Date;
import java.util.UUID;
import javax.cache.CacheManager;
import javax.cache.configuration.MutableConfiguration;
import net.bytebuddy.utility.RandomString;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.UnitOfWork;
import net.helenus.core.annotation.Cacheable;
import net.helenus.core.reflect.Entity;
import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.annotation.Constraints;
import net.helenus.mapping.annotation.Index;
import net.helenus.mapping.annotation.PartitionKey;
@ -75,6 +80,13 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.idempotentQueryExecution(true)
.get();
widget = session.dsl(Widget.class);
MutableConfiguration<String, Object> configuration = new MutableConfiguration<>();
configuration
.setStoreByValue(false)
.setReadThrough(false);
CacheManager cacheManager = session.getCacheManager();
cacheManager.createCache(MappingUtil.getTableName(Widget.class, true).toString(), configuration);
}
@Test
@ -332,8 +344,12 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
w2 =
session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null);
String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key.toString();
uow.cacheUpdate(cacheKey, w1);
// This should remove the object from the cache.
session.delete(widget).where(widget::id, eq(key)).sync(uow);
uow.cacheDelete(cacheKey);
// This should fail to read from the cache.
w3 =
@ -452,6 +468,9 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.value(widget::id, key1)
.value(widget::name, RandomString.make(20))
.sync(uow);
String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key1.toString();
uow.cacheUpdate(cacheKey, w1);
/*
w2 = session.<Widget>upsert(w1)
.value(widget::a, RandomString.make(10))
@ -484,9 +503,12 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.value(widget::d, RandomString.make(10))
.sync(uow);
String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key.toString();
uow.cacheUpdate(cacheKey, w1);
// This should read from the cache and get the same instance of a Widget.
w2 =
session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null);
uow.cacheUpdate(cacheKey, w1);
uow.commit()
.andThen(