diff --git a/NOTES b/NOTES index 4c16625..bbacfc9 100644 --- a/NOTES +++ b/NOTES @@ -356,40 +356,6 @@ begin: ----------------- ---- postCommitFunction - elapsedTime_.stop(); - if (purpose_ != null) { - logTimers("committed"); - } - ---- abort - elapsedTime_.stop(); - if (purpose_ != null) { - logTimers("aborted"); - } - - ------------------ - else { - Cache cache = session.getSessionCache(); - String[] keys = flattenFacets(facets); - for (String key : keys) { - Object value = cache.getIfPresent(key); - if (value != null) { - result = Optional.of(value); - break; - } - } - } - - ----------------------- - - - - ------------------------ - /*else { Cache cache = session.getSessionCache(); Map rowMap = this.cache.rowMap(); diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index 29e87ac..d7b666e 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -16,9 +16,12 @@ package net.helenus.core; import java.io.PrintStream; +import java.util.List; import java.util.concurrent.Executor; +import com.google.common.cache.Cache; import com.google.common.collect.Table; +import net.helenus.core.cache.Facet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +129,10 @@ public abstract class AbstractSessionOperations { throw new HelenusException(e); } + public Object checkCache(String tableName, List facets) { return null; } + + public void updateCache(Object pojo, List facets) { } + void printCql(String cql) { getPrintStream().println(cql); } diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java index a266246..9792e4a 100644 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -44,7 +44,8 @@ public abstract class AbstractUnitOfWork implements UnitOfW private String purpose_; private Stopwatch elapsedTime_; - public Stopwatch databaseTime_ = Stopwatch.createUnstarted(); + private Stopwatch databaseTime_ = Stopwatch.createUnstarted(); + private Stopwatch cacheLookupTime_ = Stopwatch.createUnstarted(); // Cache: private final Table cache = HashBasedTable.create(); @@ -56,12 +57,17 @@ public abstract class AbstractUnitOfWork implements UnitOfW this.parent = parent; } - @Override - public Stopwatch getExecutionTimer() { - return databaseTime_; - } + @Override + public Stopwatch getExecutionTimer() { + return databaseTime_; + } - @Override + @Override + public Stopwatch getCacheLookupTimer() { + return cacheLookupTime_; + } + + @Override public void addNestedUnitOfWork(UnitOfWork uow) { synchronized (nested) { nested.add((AbstractUnitOfWork) uow); @@ -100,8 +106,8 @@ public abstract class AbstractUnitOfWork implements UnitOfW @Override public Optional cacheLookup(List facets) { - Facet table = facets.remove(0); - String tableName = table.value().toString(); + Facet table = facets.remove(0); + String tableName = table.value().toString(); Optional result = Optional.empty(); for (Facet facet : facets) { String columnName = facet.name() + "==" + facet.value(); @@ -120,7 +126,7 @@ public abstract class AbstractUnitOfWork implements UnitOfW // Be sure to check all enclosing UnitOfWork caches as well, we may be nested. if (parent != null) { return parent.cacheLookup(facets); - } + } } return result; } diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index 10629b8..5f1a49a 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -180,15 +180,73 @@ public final class HelenusSession extends AbstractSessionOperations implements C } @Override + public Object checkCache(String tableName, List facets) { + List facetCombinations = CacheUtil.flattenFacets(facets); + Object result = null; + for (String[] combination : facetCombinations) { + String cacheKey = tableName + "." + Arrays.toString(combination); + result = sessionCache.getIfPresent(cacheKey); + if (result != null) { + return result; + } + } + return null; + } + + @Override + public void updateCache(Object pojo, List facets) { + Map valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; + List boundFacets = new ArrayList<>(); + for (Facet facet : facets) { + if (facet instanceof UnboundFacet) { + UnboundFacet unboundFacet = (UnboundFacet) facet; + UnboundFacet.Binder binder = unboundFacet.binder(); + unboundFacet.getProperties().forEach(prop -> { + if (valueMap == null) { + Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false); + binder.setValueForProperty(prop, value.toString()); + } else { + binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString()); + } + }); + if (binder.isBound()) { + boundFacets.add(binder.bind()); + } + } else { + boundFacets.add(facet); + } + } + Facet table = boundFacets.remove(0); + String tableName = table.value().toString(); + List facetCombinations = CacheUtil.flattenFacets(boundFacets); + Object value = sessionCache.getIfPresent(pojo); + Object mergedValue = null; + for (String[] combination : facetCombinations) { + String cacheKey = tableName + "." + Arrays.toString(combination); + if (value == null) { + sessionCache.put(cacheKey, pojo); + } else { + if (mergedValue == null) { + mergedValue = pojo; + } else { + mergedValue = CacheUtil.merge(value, pojo); + } + sessionCache.put(mergedValue, pojo); + } + } + + } + + @Override public void mergeCache(Table uowCache) { List pojos = uowCache.values().stream().distinct() .collect(Collectors.toList()); for (Object pojo : pojos) { - HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo)); + HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo)); Map valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; if (entity.isCacheable()) { - List boundFacets = new ArrayList<>(); - for (Facet facet : entity.getFacets()) { + List boundFacets = new ArrayList<>(); + for (Facet facet : entity.getFacets()) { if (facet instanceof UnboundFacet) { UnboundFacet unboundFacet = (UnboundFacet) facet; UnboundFacet.Binder binder = unboundFacet.binder(); @@ -207,7 +265,9 @@ public final class HelenusSession extends AbstractSessionOperations implements C boundFacets.add(facet); } } - String tableName = entity.getName().toCql(); + //String tableName = entity.getName().toCql(); + Facet table = boundFacets.remove(0); + String tableName = table.value().toString(); List facetCombinations = CacheUtil.flattenFacets(boundFacets); Object value = sessionCache.getIfPresent(pojo); Object mergedValue = null; diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 26ec567..c0d8817 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -23,42 +23,44 @@ import net.helenus.core.cache.Facet; public interface UnitOfWork extends AutoCloseable { - /** - * Marks the beginning of a transactional section of work. Will write a record - * to the shared write-ahead log. - * - * @return the handle used to commit or abort the work. - */ - UnitOfWork begin(); + /** + * Marks the beginning of a transactional section of work. Will write a record + * to the shared write-ahead log. + * + * @return the handle used to commit or abort the work. + */ + UnitOfWork begin(); - void addNestedUnitOfWork(UnitOfWork uow); + void addNestedUnitOfWork(UnitOfWork uow); - /** - * Checks to see if the work performed between calling begin and now can be - * committed or not. - * - * @return a function from which to chain work that only happens when commit is - * successful - * @throws X - * when the work overlaps with other concurrent writers. - */ - PostCommitFunction commit() throws X; + /** + * Checks to see if the work performed between calling begin and now can be + * committed or not. + * + * @return a function from which to chain work that only happens when commit is + * successful + * @throws X when the work overlaps with other concurrent writers. + */ + PostCommitFunction commit() throws X; - /** - * Explicitly abort the work within this unit of work. Any nested aborted unit - * of work will trigger the entire unit of work to commit. - */ - void abort(); + /** + * Explicitly abort the work within this unit of work. Any nested aborted unit + * of work will trigger the entire unit of work to commit. + */ + void abort(); - boolean hasAborted(); + boolean hasAborted(); - boolean hasCommitted(); + boolean hasCommitted(); - Optional cacheLookup(List facets); + Optional cacheLookup(List facets); - void cacheUpdate(Object pojo, List facets); + void cacheUpdate(Object pojo, List facets); UnitOfWork setPurpose(String purpose); Stopwatch getExecutionTimer(); + + Stopwatch getCacheLookupTimer(); + } diff --git a/src/main/java/net/helenus/core/cache/CacheUtil.java b/src/main/java/net/helenus/core/cache/CacheUtil.java index 12ac79b..2fe5781 100644 --- a/src/main/java/net/helenus/core/cache/CacheUtil.java +++ b/src/main/java/net/helenus/core/cache/CacheUtil.java @@ -30,9 +30,6 @@ public class CacheUtil { } public static List flattenFacets(List facets) { - Facet table = facets.remove(0); - String tableName = table.value().toString(); - List combinations = CacheUtil.combinations(facets.stream() .filter(facet -> facet.value() != null) .map(facet -> { diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index 6ba269b..5ed322d 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -25,6 +25,7 @@ import com.codahale.metrics.Timer; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.google.common.base.Function; +import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -67,10 +68,36 @@ public abstract class AbstractOptionalOperation sync() {//throws TimeoutException { final Timer.Context context = requestLatency.time(); try { - ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, - showValues, false); - return transform(resultSet); - } finally { + Optional result = Optional.empty(); + E cacheResult = null; + boolean updateCache = true; + + if (enableCache) { + List facets = bindFacetValues(); + Facet table = facets.remove(0); + String tableName = table.value().toString(); + cacheResult = (E)sessionOps.checkCache(tableName, facets); + if (cacheResult != null) { + result = Optional.of(cacheResult); + updateCache = false; + } + } + + if (!result.isPresent()) { + // Formulate the query and execute it against the Cassandra cluster. + ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, + queryTimeoutUnits, + showValues, false); + + // Transform the query result set into the desired shape. + result = transform(resultSet); + } + + if (updateCache && result.isPresent()) { + sessionOps.updateCache(result.get(), getFacets()); + } + return result; + } finally { context.stop(); } } @@ -82,31 +109,41 @@ public abstract class AbstractOptionalOperation result = Optional.empty(); - E cacheResult = null; - String[] statementKeys = null; + Optional result = Optional.empty(); + E cacheResult = null; + boolean updateCache = true; if (enableCache) { - List facets = bindFacetValues(); + Stopwatch timer = uow.getCacheLookupTimer(); + timer.start(); + List facets = bindFacetValues(); cacheResult = checkCache(uow, facets); if (cacheResult != null) { result = Optional.of(cacheResult); + updateCache = false; + } else { + Facet table = facets.remove(0); + String tableName = table.value().toString(); + cacheResult = (E)sessionOps.checkCache(tableName, facets); + if (cacheResult != null) { + result = Optional.of(cacheResult); + } } + timer.stop(); } if (!result.isPresent()) { - // Formulate the query and execute it against the Cassandra cluster. + // Formulate the query and execute it against the Cassandra cluster. ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, true); - // Transform the query result set into the desired shape. + // Transform the query result set into the desired shape. result = transform(resultSet); } - // If we have a result, it wasn't from cache, and we're caching things then we - // need to put this result - // into the cache for future requests to find. - if (enableCache && cacheResult == null && result.isPresent()) { + // If we have a result, it wasn't from the UOW cache, and we're caching things then we + // need to put this result into the cache for future requests to find. + if (updateCache && result.isPresent()) { updateCache(uow, result.get(), getFacets()); } diff --git a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java index a51e522..c13558c 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java @@ -16,6 +16,7 @@ package net.helenus.core.operation; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeoutException; @@ -25,6 +26,7 @@ import com.codahale.metrics.Timer; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.google.common.base.Function; +import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -58,11 +60,42 @@ public abstract class AbstractStreamOperation sync() {//throws TimeoutException { - final Timer.Context context = requestLatency.time(); - try { - ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, - showValues, false); - return transform(resultSet); + final Timer.Context context = requestLatency.time(); + try { + Stream resultStream = null; + E cacheResult = null; + boolean updateCache = true; + + if (enableCache) { + List facets = bindFacetValues(); + Facet table = facets.remove(0); + String tableName = table.value().toString(); + cacheResult = (E) sessionOps.checkCache(tableName, facets); + if (cacheResult != null) { + resultStream = Stream.of(cacheResult); + updateCache = false; + } + } + + if (resultStream == null) { + // Formulate the query and execute it against the Cassandra cluster. + ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, + queryTimeoutUnits, + showValues, false); + + // Transform the query result set into the desired shape. + resultStream = transform(resultSet); + } + + if (enableCache && resultStream != null) { + List facets = getFacets(); + resultStream.forEach(result -> { + sessionOps.updateCache(result, facets); + }); + } + + return resultStream; + } finally { context.stop(); } @@ -74,30 +107,38 @@ public abstract class AbstractStreamOperation result = null; + Stream resultStream = null; E cachedResult = null; + boolean updateCache = true; if (enableCache) { + Stopwatch timer = uow.getCacheLookupTimer(); + timer.start(); List facets = bindFacetValues(); cachedResult = checkCache(uow, facets); if (cachedResult != null) { - result = Stream.of(cachedResult); + resultStream = Stream.of(cachedResult); + updateCache = false; } + timer.stop(); } - if (result == null) { + if (resultStream == null) { ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, true); - result = transform(resultSet); + resultStream = transform(resultSet); } // If we have a result and we're caching then we need to put it into the cache // for future requests to find. - if (enableCache && cachedResult != null) { - updateCache(uow, cachedResult, getFacets()); + if (updateCache && resultStream != null) { + List facets = getFacets(); + resultStream.forEach(result -> { + updateCache(uow, result, facets); + }); } - return result; + return resultStream; } finally { context.stop(); } diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java index 27e0d2d..27e86bb 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java @@ -57,7 +57,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { @Test public void testSelectAfterSelect() throws Exception { - Widget w1, w2; + Widget w1, w2, w3; UUID key = UUIDs.timeBased(); // This should inserted Widget, but not cache it. @@ -77,7 +77,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { // This should read from the cache and get the same instance of a Widget. w2 = - session.select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null); + session.select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null); uow.commit() .andThen( @@ -85,6 +85,9 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { Assert.assertEquals(w1, w2); }); } + + w3 = session.select(widget).where(widget::name, eq(w1.name())).single().sync().orElse(null); + Assert.assertEquals(w1, w3); } @Test