From ecd3d71e47fce4bf232dd00ea415568d4c2d7122 Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Mon, 23 Oct 2017 11:10:55 -0400 Subject: [PATCH] WIP: session cache populated with UOW data on commit. Also added logging for UOW time. --- NOTES | 50 +++++++--- .../core/AbstractSessionOperations.java | 9 +- .../net/helenus/core/AbstractUnitOfWork.java | 99 ++++++++----------- src/main/java/net/helenus/core/Helenus.java | 4 +- .../java/net/helenus/core/HelenusSession.java | 69 ++++++++++++- .../java/net/helenus/core/UnitOfWork.java | 4 +- .../net/helenus/core/cache/CacheUtil.java | 34 +++++-- .../operation/AbstractOptionalOperation.java | 2 +- .../operation/AbstractStatementOperation.java | 21 ++-- .../net/helenus/core/operation/Operation.java | 11 ++- .../core/operation/SelectOperation.java | 14 ++- .../core/unitofwork/UnitOfWorkTest.java | 2 + 12 files changed, 210 insertions(+), 109 deletions(-) diff --git a/NOTES b/NOTES index 43cdd6b..4c16625 100644 --- a/NOTES +++ b/NOTES @@ -355,17 +355,6 @@ begin: ----------------- - public void setPurpose(String purpose) { - purpose_ = purpose; - } - - public void logTimers(String what) { - LOG.info(String.format("UOW(%s) %s %s (total: %.3fµs db: %.3fµs or %2.2f%% of total time)", - hashCode(), purpose_, what, - elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0, - databaseTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0, - (elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / databaseTime_.elapsed(TimeUnit.MICROSECONDS)) * 100.0)); - } --- postCommitFunction elapsedTime_.stop(); @@ -378,3 +367,42 @@ begin: if (purpose_ != null) { logTimers("aborted"); } + + +----------------- + else { + Cache cache = session.getSessionCache(); + String[] keys = flattenFacets(facets); + for (String key : keys) { + Object value = cache.getIfPresent(key); + if (value != null) { + result = Optional.of(value); + break; + } + } + } + + +---------------------- + + + + +----------------------- + +/*else { + Cache cache = session.getSessionCache(); + Map rowMap = this.cache.rowMap(); + for (String rowKey : rowMap.keySet()) { + String keys = flattenFacets(facets); + for (String key : keys) { + Object value = cache.getIfPresent(key); + if (value != null) { + result = Optional.of(value); + break; + } + } + } + cache.put + } + */ diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index 8039650..29e87ac 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -18,6 +18,7 @@ package net.helenus.core; import java.io.PrintStream; import java.util.concurrent.Executor; +import com.google.common.collect.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +34,7 @@ import net.helenus.support.HelenusException; public abstract class AbstractSessionOperations { - final Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger LOG = LoggerFactory.getLogger(AbstractSessionOperations.class); public abstract Session currentSession(); @@ -87,8 +88,8 @@ public abstract class AbstractSessionOperations { } void log(Statement statement, boolean showValues) { - if (logger.isInfoEnabled()) { - logger.info("Execute statement " + statement); + if (LOG.isInfoEnabled()) { + LOG.info("Execute statement " + statement); } if (isShowCql()) { if (statement instanceof BuiltStatement) { @@ -116,6 +117,8 @@ public abstract class AbstractSessionOperations { return null; } + public void mergeCache(Table cache) {} + RuntimeException translateException(RuntimeException e) { if (e instanceof HelenusException) { return e; diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java index 8779a75..a266246 100644 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -16,20 +16,25 @@ package net.helenus.core; import java.util.*; -import java.util.stream.Collectors; +import java.util.concurrent.TimeUnit; import com.diffplug.common.base.Errors; import com.google.common.base.Stopwatch; -import com.google.common.cache.Cache; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; import com.google.common.collect.TreeTraverser; import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Encapsulates the concept of a "transaction" as a unit-of-work. */ public abstract class AbstractUnitOfWork implements UnitOfWork, AutoCloseable { + + + private static final Logger LOG = LoggerFactory.getLogger(AbstractUnitOfWork.class); + private final List> nested = new ArrayList<>(); private final HelenusSession session; private final AbstractUnitOfWork parent; @@ -65,17 +70,32 @@ public abstract class AbstractUnitOfWork implements UnitOfW @Override public UnitOfWork begin() { - elapsedTime_.start(); + elapsedTime_ = Stopwatch.createStarted(); // log.record(txn::start) return this; } - private void applyPostCommitFunctions() { + @Override + public UnitOfWork setPurpose(String purpose) { + purpose_ = purpose; + return this; + } + + public void logTimers(String what) { + double e = (double)elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0; + double d = (double)databaseTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0; + double f = ((double)databaseTime_.elapsed(TimeUnit.NANOSECONDS)) / ((double)elapsedTime_.elapsed(TimeUnit.NANOSECONDS)) * 100.0; + LOG.info(String.format("UOW(%s)%s %s (total: %.3fms db: %.3fms or %2.2f%% of total time)", + hashCode(), (purpose_ == null ? "" : " " + purpose_), what, e, d, f)); + } + + private void applyPostCommitFunctions() { if (!postCommit.isEmpty()) { for (CommitThunk f : postCommit) { f.apply(); } } + logTimers("committed"); } @Override @@ -100,16 +120,6 @@ public abstract class AbstractUnitOfWork implements UnitOfW // Be sure to check all enclosing UnitOfWork caches as well, we may be nested. if (parent != null) { return parent.cacheLookup(facets); - } else { - Cache cache = session.getSessionCache(); - String[] keys = flattenFacets(facets); - for (String key : keys) { - Object value = cache.getIfPresent(key); - if (value != null) { - result = Optional.of(value); - break; - } - } } } return result; @@ -125,23 +135,6 @@ public abstract class AbstractUnitOfWork implements UnitOfW } } - private String[] flattenFacets(List facets) { - Facet table = facets.remove(0); - String tableName = table.value().toString(); - - List combinations = CacheUtil.combinations(facets.stream() - .map(facet -> { - return facet.name() + "==" + facet.value(); - }).collect(Collectors.toList()).toArray(new String[facets.size()])); - - int i = 0; - String[] results = new String[facets.size()]; - for (String[] combination : combinations) { - results[i++] = tableName + "." + combination; - } - return results; - } - private Iterator> getChildNodes() { return nested.iterator(); } @@ -182,22 +175,10 @@ public abstract class AbstractUnitOfWork implements UnitOfW // Merge UOW cache into parent's cache. if (parent != null) { parent.mergeCache(cache); - } /*else { - Cache cache = session.getSessionCache(); - Map rowMap = this.cache.rowMap(); - for (String rowKey : rowMap.keySet()) { - String keys = flattenFacets(facets); - for (String key : keys) { - Object value = cache.getIfPresent(key); - if (value != null) { - result = Optional.of(value); - break; - } - } - } - cache.put + } else { + session.mergeCache(cache); } - */ + elapsedTime_.stop(); // Apply all post-commit functions for if (parent == null) { @@ -223,23 +204,21 @@ public abstract class AbstractUnitOfWork implements UnitOfW }); // log.record(txn::abort) // cache.invalidateSince(txn::start time) + elapsedTime_.stop(); + logTimers("aborted"); } private void mergeCache(Table from) { - Table to = this.cache; - from.rowMap().forEach((rowKey, columnMap) -> { - columnMap.forEach((columnKey, value) -> { - if (to.contains(rowKey, columnKey)) { - to.put(rowKey, columnKey, merge(to.get(rowKey, columnKey), from.get(rowKey, columnKey))); - } else { - to.put(rowKey, columnKey, from.get(rowKey, columnKey)); - } - }); - }); - } - - private Object merge(Object to, Object from) { - return to; // TODO(gburd): yeah... + Table to = this.cache; + from.rowMap().forEach((rowKey, columnMap) -> { + columnMap.forEach((columnKey, value) -> { + if (to.contains(rowKey, columnKey)) { + to.put(rowKey, columnKey, CacheUtil.merge(to.get(rowKey, columnKey), from.get(rowKey, columnKey))); + } else { + to.put(rowKey, columnKey, from.get(rowKey, columnKey)); + } + }); + }); } public String describeConflicts() { diff --git a/src/main/java/net/helenus/core/Helenus.java b/src/main/java/net/helenus/core/Helenus.java index 7ea1c3e..686b8dc 100644 --- a/src/main/java/net/helenus/core/Helenus.java +++ b/src/main/java/net/helenus/core/Helenus.java @@ -184,7 +184,9 @@ public final class Helenus { throw new HelenusMappingException("class is not an interface " + iface); } - metadataForEntity.putIfAbsent(iface, metadata); + if (metadata != null) { + metadataForEntity.putIfAbsent(iface, metadata); + } return entity(iface, metadata); } diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index 4776bbb..10629b8 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -21,20 +21,26 @@ import java.io.Closeable; import java.io.PrintStream; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; import com.codahale.metrics.MetricRegistry; import com.datastax.driver.core.*; import brave.Tracer; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Table; +import net.helenus.core.cache.CacheUtil; +import net.helenus.core.cache.Facet; +import net.helenus.core.cache.UnboundFacet; import net.helenus.core.operation.*; import net.helenus.core.reflect.Drafted; import net.helenus.core.reflect.HelenusPropertyNode; +import net.helenus.core.reflect.MapExportable; import net.helenus.mapping.HelenusEntity; import net.helenus.mapping.MappingUtil; import net.helenus.mapping.value.*; @@ -64,6 +70,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C private final SessionRepository sessionRepository; private final Executor executor; private final boolean dropSchemaOnClose; + private final Cache sessionCache; private final RowColumnValueProvider valueProvider; private final StatementColumnValuePreparer valuePreparer; @@ -88,6 +95,9 @@ public final class HelenusSession extends AbstractSessionOperations implements C this.metricRegistry = metricRegistry; this.zipkinTracer = tracer; + this.sessionCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE) + .expireAfterAccess(MAX_CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).recordStats().build(); + this.valueProvider = new RowColumnValueProvider(this.sessionRepository); this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository); this.metadata = session.getCluster().getMetadata(); @@ -169,7 +179,56 @@ public final class HelenusSession extends AbstractSessionOperations implements C return defaultQueryIdempotency; } - public Metadata getMetadata() { + @Override + public void mergeCache(Table uowCache) { + List pojos = uowCache.values().stream().distinct() + .collect(Collectors.toList()); + for (Object pojo : pojos) { + HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo)); + Map valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; + if (entity.isCacheable()) { + List boundFacets = new ArrayList<>(); + for (Facet facet : entity.getFacets()) { + if (facet instanceof UnboundFacet) { + UnboundFacet unboundFacet = (UnboundFacet) facet; + UnboundFacet.Binder binder = unboundFacet.binder(); + unboundFacet.getProperties().forEach(prop -> { + if (valueMap == null) { + Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false); + binder.setValueForProperty(prop, value.toString()); + } else { + binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString()); + } + }); + if (binder.isBound()) { + boundFacets.add(binder.bind()); + } + } else { + boundFacets.add(facet); + } + } + String tableName = entity.getName().toCql(); + List facetCombinations = CacheUtil.flattenFacets(boundFacets); + Object value = sessionCache.getIfPresent(pojo); + Object mergedValue = null; + for (String[] combination : facetCombinations) { + String cacheKey = tableName + "." + Arrays.toString(combination); + if (value == null) { + sessionCache.put(cacheKey, pojo); + } else { + if (mergedValue == null) { + mergedValue = pojo; + } else { + mergedValue = CacheUtil.merge(value, pojo); + } + sessionCache.put(mergedValue, pojo); + } + } + } + } + } + + public Metadata getMetadata() { return metadata; } diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 2049fb5..26ec567 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -58,5 +58,7 @@ public interface UnitOfWork extends AutoCloseable { void cacheUpdate(Object pojo, List facets); - Stopwatch getExecutionTimer(); + UnitOfWork setPurpose(String purpose); + + Stopwatch getExecutionTimer(); } diff --git a/src/main/java/net/helenus/core/cache/CacheUtil.java b/src/main/java/net/helenus/core/cache/CacheUtil.java index 89f4769..12ac79b 100644 --- a/src/main/java/net/helenus/core/cache/CacheUtil.java +++ b/src/main/java/net/helenus/core/cache/CacheUtil.java @@ -3,30 +3,46 @@ package net.helenus.core.cache; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; public class CacheUtil { - public static List combinations(String... items) { - int n = items.length; + public static List combinations(List items) { + int n = items.size(); if (n > 20 || n < 0) throw new IllegalArgumentException(n + " is out of range"); long e = Math.round(Math.pow(2, n)); List out = new ArrayList((int) e - 1); - Arrays.sort(items); - for (int k = 1; k <= items.length; k++) { - kcomb(items, 0, k, new String[k], out); + for (int k = 1; k <= items.size(); k++) { + kCombinations(items, 0, k, new String[k], out); } return out; } - private static void kcomb(String[] items, int n, int k, String[] arr, List out) { + private static void kCombinations(List items, int n, int k, String[] arr, List out) { if (k == 0) { out.add(arr.clone()); } else { - for (int i = n; i <= items.length - k; i++) { - arr[arr.length - k] = items[i]; - kcomb(items, i + 1, k - 1, arr, out); + for (int i = n; i <= items.size() - k; i++) { + arr[arr.length - k] = items.get(i); + kCombinations(items, i + 1, k - 1, arr, out); } } } + public static List flattenFacets(List facets) { + Facet table = facets.remove(0); + String tableName = table.value().toString(); + + List combinations = CacheUtil.combinations(facets.stream() + .filter(facet -> facet.value() != null) + .map(facet -> { + return facet.name() + "==" + facet.value(); + }).collect(Collectors.toList())); + return combinations; + } + + public static Object merge(Object to, Object from) { + return to; // TODO(gburd): yeah... + } + } diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index 86e3dd8..6ba269b 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -91,7 +91,7 @@ public abstract class AbstractOptionalOperation> extends Operation { - final Logger logger = LoggerFactory.getLogger(getClass()); + private static final Logger LOG = LoggerFactory.getLogger(AbstractStatementOperation.class); + protected boolean enableCache = true; protected boolean showValues = true; protected TraceContext traceContext; @@ -326,14 +327,14 @@ public abstract class AbstractStatementOperation uow, E pojo, List identifyingFacets) { List facets = new ArrayList<>(); - Map valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; + Map valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; for (Facet facet : identifyingFacets) { if (facet instanceof UnboundFacet) { UnboundFacet unboundFacet = (UnboundFacet) facet; UnboundFacet.Binder binder = unboundFacet.binder(); unboundFacet.getProperties().forEach(prop -> { - if (valueMap == null) { - Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false); - binder.setValueForProperty(prop, value.toString()); - } else { - binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString()); - } + if (valueMap == null) { + Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false); + binder.setValueForProperty(prop, value.toString()); + } else { + binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString()); + } facets.add(binder.bind()); }); } else { diff --git a/src/main/java/net/helenus/core/operation/Operation.java b/src/main/java/net/helenus/core/operation/Operation.java index 64c6a39..2ce21a2 100644 --- a/src/main/java/net/helenus/core/operation/Operation.java +++ b/src/main/java/net/helenus/core/operation/Operation.java @@ -68,11 +68,16 @@ public abstract class Operation { } Statement statement = options(buildStatement(cached)); - Stopwatch timer = uow.getExecutionTimer(); - timer.start(); + Stopwatch timer = null; + if (uow != null) { + timer = uow.getExecutionTimer(); + timer.start(); + } ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); ResultSet resultSet = futureResultSet.getUninterruptibly(); //TODO(gburd): (timeout, units); - timer.stop(); + + if (uow != null) timer.stop(); + return resultSet; } finally { diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index 59709fe..6de43ad 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -41,9 +41,13 @@ import net.helenus.mapping.value.ColumnValueProvider; import net.helenus.mapping.value.ValueProviderMap; import net.helenus.support.Fun; import net.helenus.support.HelenusMappingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class SelectOperation extends AbstractFilterStreamOperation> { + private static final Logger LOG = LoggerFactory.getLogger(SelectOperation.class); + protected final List props = new ArrayList(); protected Function rowMapper = null; protected List ordering = null; @@ -188,10 +192,10 @@ public final class SelectOperation extends AbstractFilterStreamOperation boundFacets = new ArrayList<>(); for (Facet facet : entity.getFacets()) { - if (facet instanceof UnboundFacet) { - UnboundFacet unboundFacet = (UnboundFacet) facet; - UnboundFacet.Binder binder = unboundFacet.binder(); - unboundFacet.getProperties().forEach(prop -> { + if (facet instanceof UnboundFacet) { + UnboundFacet unboundFacet = (UnboundFacet) facet; + UnboundFacet.Binder binder = unboundFacet.binder(); + unboundFacet.getProperties().forEach(prop -> { Filter filter = filters.get(prop); if (filter != null) { Object[] postulates = filter.postulateValues(); @@ -277,7 +281,7 @@ public final class SelectOperation extends AbstractFilterStreamOperationselect(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null);