WIP: Session cache.

This commit is contained in:
Greg Burd 2017-10-23 12:17:20 -04:00
parent ecd3d71e47
commit fc83b8032c
9 changed files with 224 additions and 105 deletions

34
NOTES
View file

@ -356,40 +356,6 @@ begin:
----------------- -----------------
--- postCommitFunction
elapsedTime_.stop();
if (purpose_ != null) {
logTimers("committed");
}
--- abort
elapsedTime_.stop();
if (purpose_ != null) {
logTimers("aborted");
}
-----------------
else {
Cache<String, Object> cache = session.getSessionCache();
String[] keys = flattenFacets(facets);
for (String key : keys) {
Object value = cache.getIfPresent(key);
if (value != null) {
result = Optional.of(value);
break;
}
}
}
----------------------
-----------------------
/*else { /*else {
Cache<String, Object> cache = session.getSessionCache(); Cache<String, Object> cache = session.getSessionCache();
Map<String, Object> rowMap = this.cache.rowMap(); Map<String, Object> rowMap = this.cache.rowMap();

View file

@ -16,9 +16,12 @@
package net.helenus.core; package net.helenus.core;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import com.google.common.cache.Cache;
import com.google.common.collect.Table; import com.google.common.collect.Table;
import net.helenus.core.cache.Facet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -126,6 +129,10 @@ public abstract class AbstractSessionOperations {
throw new HelenusException(e); throw new HelenusException(e);
} }
public Object checkCache(String tableName, List<Facet> facets) { return null; }
public void updateCache(Object pojo, List<Facet> facets) { }
void printCql(String cql) { void printCql(String cql) {
getPrintStream().println(cql); getPrintStream().println(cql);
} }

View file

@ -44,7 +44,8 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
private String purpose_; private String purpose_;
private Stopwatch elapsedTime_; private Stopwatch elapsedTime_;
public Stopwatch databaseTime_ = Stopwatch.createUnstarted(); private Stopwatch databaseTime_ = Stopwatch.createUnstarted();
private Stopwatch cacheLookupTime_ = Stopwatch.createUnstarted();
// Cache: // Cache:
private final Table<String, String, Object> cache = HashBasedTable.create(); private final Table<String, String, Object> cache = HashBasedTable.create();
@ -56,12 +57,17 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
this.parent = parent; this.parent = parent;
} }
@Override @Override
public Stopwatch getExecutionTimer() { public Stopwatch getExecutionTimer() {
return databaseTime_; return databaseTime_;
} }
@Override @Override
public Stopwatch getCacheLookupTimer() {
return cacheLookupTime_;
}
@Override
public void addNestedUnitOfWork(UnitOfWork<E> uow) { public void addNestedUnitOfWork(UnitOfWork<E> uow) {
synchronized (nested) { synchronized (nested) {
nested.add((AbstractUnitOfWork<E>) uow); nested.add((AbstractUnitOfWork<E>) uow);
@ -100,8 +106,8 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
@Override @Override
public Optional<Object> cacheLookup(List<Facet> facets) { public Optional<Object> cacheLookup(List<Facet> facets) {
Facet table = facets.remove(0); Facet table = facets.remove(0);
String tableName = table.value().toString(); String tableName = table.value().toString();
Optional<Object> result = Optional.empty(); Optional<Object> result = Optional.empty();
for (Facet facet : facets) { for (Facet facet : facets) {
String columnName = facet.name() + "==" + facet.value(); String columnName = facet.name() + "==" + facet.value();
@ -120,7 +126,7 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested. // Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
if (parent != null) { if (parent != null) {
return parent.cacheLookup(facets); return parent.cacheLookup(facets);
} }
} }
return result; return result;
} }

View file

@ -180,15 +180,73 @@ public final class HelenusSession extends AbstractSessionOperations implements C
} }
@Override @Override
public Object checkCache(String tableName, List<Facet> facets) {
List<String[]> facetCombinations = CacheUtil.flattenFacets(facets);
Object result = null;
for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination);
result = sessionCache.getIfPresent(cacheKey);
if (result != null) {
return result;
}
}
return null;
}
@Override
public void updateCache(Object pojo, List<Facet> facets) {
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : facets) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
binder.setValueForProperty(prop, value.toString());
} else {
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
}
});
if (binder.isBound()) {
boundFacets.add(binder.bind());
}
} else {
boundFacets.add(facet);
}
}
Facet table = boundFacets.remove(0);
String tableName = table.value().toString();
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
Object value = sessionCache.getIfPresent(pojo);
Object mergedValue = null;
for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination);
if (value == null) {
sessionCache.put(cacheKey, pojo);
} else {
if (mergedValue == null) {
mergedValue = pojo;
} else {
mergedValue = CacheUtil.merge(value, pojo);
}
sessionCache.put(mergedValue, pojo);
}
}
}
@Override
public void mergeCache(Table<String, String, Object> uowCache) { public void mergeCache(Table<String, String, Object> uowCache) {
List<Object> pojos = uowCache.values().stream().distinct() List<Object> pojos = uowCache.values().stream().distinct()
.collect(Collectors.toList()); .collect(Collectors.toList());
for (Object pojo : pojos) { for (Object pojo : pojos) {
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo)); HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo));
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
if (entity.isCacheable()) { if (entity.isCacheable()) {
List<Facet> boundFacets = new ArrayList<>(); List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : entity.getFacets()) { for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) { if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet; UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder(); UnboundFacet.Binder binder = unboundFacet.binder();
@ -207,7 +265,9 @@ public final class HelenusSession extends AbstractSessionOperations implements C
boundFacets.add(facet); boundFacets.add(facet);
} }
} }
String tableName = entity.getName().toCql(); //String tableName = entity.getName().toCql();
Facet table = boundFacets.remove(0);
String tableName = table.value().toString();
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets); List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
Object value = sessionCache.getIfPresent(pojo); Object value = sessionCache.getIfPresent(pojo);
Object mergedValue = null; Object mergedValue = null;

View file

@ -23,42 +23,44 @@ import net.helenus.core.cache.Facet;
public interface UnitOfWork<X extends Exception> extends AutoCloseable { public interface UnitOfWork<X extends Exception> extends AutoCloseable {
/** /**
* Marks the beginning of a transactional section of work. Will write a record * Marks the beginning of a transactional section of work. Will write a record
* to the shared write-ahead log. * to the shared write-ahead log.
* *
* @return the handle used to commit or abort the work. * @return the handle used to commit or abort the work.
*/ */
UnitOfWork<X> begin(); UnitOfWork<X> begin();
void addNestedUnitOfWork(UnitOfWork<X> uow); void addNestedUnitOfWork(UnitOfWork<X> uow);
/** /**
* Checks to see if the work performed between calling begin and now can be * Checks to see if the work performed between calling begin and now can be
* committed or not. * committed or not.
* *
* @return a function from which to chain work that only happens when commit is * @return a function from which to chain work that only happens when commit is
* successful * successful
* @throws X * @throws X when the work overlaps with other concurrent writers.
* when the work overlaps with other concurrent writers. */
*/ PostCommitFunction<Void, Void> commit() throws X;
PostCommitFunction<Void, Void> commit() throws X;
/** /**
* Explicitly abort the work within this unit of work. Any nested aborted unit * Explicitly abort the work within this unit of work. Any nested aborted unit
* of work will trigger the entire unit of work to commit. * of work will trigger the entire unit of work to commit.
*/ */
void abort(); void abort();
boolean hasAborted(); boolean hasAborted();
boolean hasCommitted(); boolean hasCommitted();
Optional<Object> cacheLookup(List<Facet> facets); Optional<Object> cacheLookup(List<Facet> facets);
void cacheUpdate(Object pojo, List<Facet> facets); void cacheUpdate(Object pojo, List<Facet> facets);
UnitOfWork setPurpose(String purpose); UnitOfWork setPurpose(String purpose);
Stopwatch getExecutionTimer(); Stopwatch getExecutionTimer();
Stopwatch getCacheLookupTimer();
} }

View file

@ -30,9 +30,6 @@ public class CacheUtil {
} }
public static List<String[]> flattenFacets(List<Facet> facets) { public static List<String[]> flattenFacets(List<Facet> facets) {
Facet table = facets.remove(0);
String tableName = table.value().toString();
List<String[]> combinations = CacheUtil.combinations(facets.stream() List<String[]> combinations = CacheUtil.combinations(facets.stream()
.filter(facet -> facet.value() != null) .filter(facet -> facet.value() != null)
.map(facet -> { .map(facet -> {

View file

@ -25,6 +25,7 @@ import com.codahale.metrics.Timer;
import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -67,10 +68,36 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
public Optional<E> sync() {//throws TimeoutException { public Optional<E> sync() {//throws TimeoutException {
final Timer.Context context = requestLatency.time(); final Timer.Context context = requestLatency.time();
try { try {
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, Optional<E> result = Optional.empty();
showValues, false); E cacheResult = null;
return transform(resultSet); boolean updateCache = true;
} finally {
if (enableCache) {
List<Facet> facets = bindFacetValues();
Facet table = facets.remove(0);
String tableName = table.value().toString();
cacheResult = (E)sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
result = Optional.of(cacheResult);
updateCache = false;
}
}
if (!result.isPresent()) {
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout,
queryTimeoutUnits,
showValues, false);
// Transform the query result set into the desired shape.
result = transform(resultSet);
}
if (updateCache && result.isPresent()) {
sessionOps.updateCache(result.get(), getFacets());
}
return result;
} finally {
context.stop(); context.stop();
} }
} }
@ -82,31 +109,41 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
final Timer.Context context = requestLatency.time(); final Timer.Context context = requestLatency.time();
try { try {
Optional<E> result = Optional.empty(); Optional<E> result = Optional.empty();
E cacheResult = null; E cacheResult = null;
String[] statementKeys = null; boolean updateCache = true;
if (enableCache) { if (enableCache) {
List<Facet> facets = bindFacetValues(); Stopwatch timer = uow.getCacheLookupTimer();
timer.start();
List<Facet> facets = bindFacetValues();
cacheResult = checkCache(uow, facets); cacheResult = checkCache(uow, facets);
if (cacheResult != null) { if (cacheResult != null) {
result = Optional.of(cacheResult); result = Optional.of(cacheResult);
updateCache = false;
} else {
Facet table = facets.remove(0);
String tableName = table.value().toString();
cacheResult = (E)sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
result = Optional.of(cacheResult);
}
} }
timer.stop();
} }
if (!result.isPresent()) { if (!result.isPresent()) {
// Formulate the query and execute it against the Cassandra cluster. // Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true); showValues, true);
// Transform the query result set into the desired shape. // Transform the query result set into the desired shape.
result = transform(resultSet); result = transform(resultSet);
} }
// If we have a result, it wasn't from cache, and we're caching things then we // If we have a result, it wasn't from the UOW cache, and we're caching things then we
// need to put this result // need to put this result into the cache for future requests to find.
// into the cache for future requests to find. if (updateCache && result.isPresent()) {
if (enableCache && cacheResult == null && result.isPresent()) {
updateCache(uow, result.get(), getFacets()); updateCache(uow, result.get(), getFacets());
} }

View file

@ -16,6 +16,7 @@
package net.helenus.core.operation; package net.helenus.core.operation;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -25,6 +26,7 @@ import com.codahale.metrics.Timer;
import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -58,11 +60,42 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
} }
public Stream<E> sync() {//throws TimeoutException { public Stream<E> sync() {//throws TimeoutException {
final Timer.Context context = requestLatency.time(); final Timer.Context context = requestLatency.time();
try { try {
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, Stream<E> resultStream = null;
showValues, false); E cacheResult = null;
return transform(resultSet); boolean updateCache = true;
if (enableCache) {
List<Facet> facets = bindFacetValues();
Facet table = facets.remove(0);
String tableName = table.value().toString();
cacheResult = (E) sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
resultStream = Stream.of(cacheResult);
updateCache = false;
}
}
if (resultStream == null) {
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout,
queryTimeoutUnits,
showValues, false);
// Transform the query result set into the desired shape.
resultStream = transform(resultSet);
}
if (enableCache && resultStream != null) {
List<Facet> facets = getFacets();
resultStream.forEach(result -> {
sessionOps.updateCache(result, facets);
});
}
return resultStream;
} finally { } finally {
context.stop(); context.stop();
} }
@ -74,30 +107,38 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
final Timer.Context context = requestLatency.time(); final Timer.Context context = requestLatency.time();
try { try {
Stream<E> result = null; Stream<E> resultStream = null;
E cachedResult = null; E cachedResult = null;
boolean updateCache = true;
if (enableCache) { if (enableCache) {
Stopwatch timer = uow.getCacheLookupTimer();
timer.start();
List<Facet> facets = bindFacetValues(); List<Facet> facets = bindFacetValues();
cachedResult = checkCache(uow, facets); cachedResult = checkCache(uow, facets);
if (cachedResult != null) { if (cachedResult != null) {
result = Stream.of(cachedResult); resultStream = Stream.of(cachedResult);
updateCache = false;
} }
timer.stop();
} }
if (result == null) { if (resultStream == null) {
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true); showValues, true);
result = transform(resultSet); resultStream = transform(resultSet);
} }
// If we have a result and we're caching then we need to put it into the cache // If we have a result and we're caching then we need to put it into the cache
// for future requests to find. // for future requests to find.
if (enableCache && cachedResult != null) { if (updateCache && resultStream != null) {
updateCache(uow, cachedResult, getFacets()); List<Facet> facets = getFacets();
resultStream.forEach(result -> {
updateCache(uow, result, facets);
});
} }
return result; return resultStream;
} finally { } finally {
context.stop(); context.stop();
} }

View file

@ -57,7 +57,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
@Test @Test
public void testSelectAfterSelect() throws Exception { public void testSelectAfterSelect() throws Exception {
Widget w1, w2; Widget w1, w2, w3;
UUID key = UUIDs.timeBased(); UUID key = UUIDs.timeBased();
// This should inserted Widget, but not cache it. // This should inserted Widget, but not cache it.
@ -77,7 +77,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
// This should read from the cache and get the same instance of a Widget. // This should read from the cache and get the same instance of a Widget.
w2 = w2 =
session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null); session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null);
uow.commit() uow.commit()
.andThen( .andThen(
@ -85,6 +85,9 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
Assert.assertEquals(w1, w2); Assert.assertEquals(w1, w2);
}); });
} }
w3 = session.<Widget>select(widget).where(widget::name, eq(w1.name())).single().sync().orElse(null);
Assert.assertEquals(w1, w3);
} }
@Test @Test