WIP: session cache populated with UOW data on commit. Also added logging for UOW time.

This commit is contained in:
Greg Burd 2017-10-23 11:10:55 -04:00
parent dc9c228e4a
commit ecd3d71e47
12 changed files with 210 additions and 109 deletions

50
NOTES
View file

@ -355,17 +355,6 @@ begin:
-----------------
public void setPurpose(String purpose) {
purpose_ = purpose;
}
public void logTimers(String what) {
LOG.info(String.format("UOW(%s) %s %s (total: %.3fµs db: %.3fµs or %2.2f%% of total time)",
hashCode(), purpose_, what,
elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0,
databaseTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0,
(elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / databaseTime_.elapsed(TimeUnit.MICROSECONDS)) * 100.0));
}
--- postCommitFunction
elapsedTime_.stop();
@ -378,3 +367,42 @@ begin:
if (purpose_ != null) {
logTimers("aborted");
}
-----------------
else {
Cache<String, Object> cache = session.getSessionCache();
String[] keys = flattenFacets(facets);
for (String key : keys) {
Object value = cache.getIfPresent(key);
if (value != null) {
result = Optional.of(value);
break;
}
}
}
----------------------
-----------------------
/*else {
Cache<String, Object> cache = session.getSessionCache();
Map<String, Object> rowMap = this.cache.rowMap();
for (String rowKey : rowMap.keySet()) {
String keys = flattenFacets(facets);
for (String key : keys) {
Object value = cache.getIfPresent(key);
if (value != null) {
result = Optional.of(value);
break;
}
}
}
cache.put
}
*/

View file

@ -18,6 +18,7 @@ package net.helenus.core;
import java.io.PrintStream;
import java.util.concurrent.Executor;
import com.google.common.collect.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,7 +34,7 @@ import net.helenus.support.HelenusException;
public abstract class AbstractSessionOperations {
final Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger LOG = LoggerFactory.getLogger(AbstractSessionOperations.class);
public abstract Session currentSession();
@ -87,8 +88,8 @@ public abstract class AbstractSessionOperations {
}
void log(Statement statement, boolean showValues) {
if (logger.isInfoEnabled()) {
logger.info("Execute statement " + statement);
if (LOG.isInfoEnabled()) {
LOG.info("Execute statement " + statement);
}
if (isShowCql()) {
if (statement instanceof BuiltStatement) {
@ -116,6 +117,8 @@ public abstract class AbstractSessionOperations {
return null;
}
public void mergeCache(Table<String, String, Object> cache) {}
RuntimeException translateException(RuntimeException e) {
if (e instanceof HelenusException) {
return e;

View file

@ -16,20 +16,25 @@
package net.helenus.core;
import java.util.*;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
import com.diffplug.common.base.Errors;
import com.google.common.base.Stopwatch;
import com.google.common.cache.Cache;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.google.common.collect.TreeTraverser;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfWork<E>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractUnitOfWork.class);
private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>();
private final HelenusSession session;
private final AbstractUnitOfWork<E> parent;
@ -65,17 +70,32 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
@Override
public UnitOfWork<E> begin() {
elapsedTime_.start();
elapsedTime_ = Stopwatch.createStarted();
// log.record(txn::start)
return this;
}
private void applyPostCommitFunctions() {
@Override
public UnitOfWork setPurpose(String purpose) {
purpose_ = purpose;
return this;
}
public void logTimers(String what) {
double e = (double)elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
double d = (double)databaseTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
double f = ((double)databaseTime_.elapsed(TimeUnit.NANOSECONDS)) / ((double)elapsedTime_.elapsed(TimeUnit.NANOSECONDS)) * 100.0;
LOG.info(String.format("UOW(%s)%s %s (total: %.3fms db: %.3fms or %2.2f%% of total time)",
hashCode(), (purpose_ == null ? "" : " " + purpose_), what, e, d, f));
}
private void applyPostCommitFunctions() {
if (!postCommit.isEmpty()) {
for (CommitThunk f : postCommit) {
f.apply();
}
}
logTimers("committed");
}
@Override
@ -100,16 +120,6 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
if (parent != null) {
return parent.cacheLookup(facets);
} else {
Cache<String, Object> cache = session.getSessionCache();
String[] keys = flattenFacets(facets);
for (String key : keys) {
Object value = cache.getIfPresent(key);
if (value != null) {
result = Optional.of(value);
break;
}
}
}
}
return result;
@ -125,23 +135,6 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
}
}
private String[] flattenFacets(List<Facet> facets) {
Facet table = facets.remove(0);
String tableName = table.value().toString();
List<String[]> combinations = CacheUtil.combinations(facets.stream()
.map(facet -> {
return facet.name() + "==" + facet.value();
}).collect(Collectors.toList()).toArray(new String[facets.size()]));
int i = 0;
String[] results = new String[facets.size()];
for (String[] combination : combinations) {
results[i++] = tableName + "." + combination;
}
return results;
}
private Iterator<AbstractUnitOfWork<E>> getChildNodes() {
return nested.iterator();
}
@ -182,22 +175,10 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
// Merge UOW cache into parent's cache.
if (parent != null) {
parent.mergeCache(cache);
} /*else {
Cache<String, Object> cache = session.getSessionCache();
Map<String, Object> rowMap = this.cache.rowMap();
for (String rowKey : rowMap.keySet()) {
String keys = flattenFacets(facets);
for (String key : keys) {
Object value = cache.getIfPresent(key);
if (value != null) {
result = Optional.of(value);
break;
}
}
}
cache.put
} else {
session.mergeCache(cache);
}
*/
elapsedTime_.stop();
// Apply all post-commit functions for
if (parent == null) {
@ -223,23 +204,21 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
});
// log.record(txn::abort)
// cache.invalidateSince(txn::start time)
elapsedTime_.stop();
logTimers("aborted");
}
private void mergeCache(Table<String, String, Object> from) {
Table<String, String, Object> to = this.cache;
from.rowMap().forEach((rowKey, columnMap) -> {
columnMap.forEach((columnKey, value) -> {
if (to.contains(rowKey, columnKey)) {
to.put(rowKey, columnKey, merge(to.get(rowKey, columnKey), from.get(rowKey, columnKey)));
} else {
to.put(rowKey, columnKey, from.get(rowKey, columnKey));
}
});
});
}
private Object merge(Object to, Object from) {
return to; // TODO(gburd): yeah...
Table<String, String, Object> to = this.cache;
from.rowMap().forEach((rowKey, columnMap) -> {
columnMap.forEach((columnKey, value) -> {
if (to.contains(rowKey, columnKey)) {
to.put(rowKey, columnKey, CacheUtil.merge(to.get(rowKey, columnKey), from.get(rowKey, columnKey)));
} else {
to.put(rowKey, columnKey, from.get(rowKey, columnKey));
}
});
});
}
public String describeConflicts() {

View file

@ -184,7 +184,9 @@ public final class Helenus {
throw new HelenusMappingException("class is not an interface " + iface);
}
metadataForEntity.putIfAbsent(iface, metadata);
if (metadata != null) {
metadataForEntity.putIfAbsent(iface, metadata);
}
return entity(iface, metadata);
}

View file

@ -21,20 +21,26 @@ import java.io.Closeable;
import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*;
import brave.Tracer;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Table;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.core.cache.UnboundFacet;
import net.helenus.core.operation.*;
import net.helenus.core.reflect.Drafted;
import net.helenus.core.reflect.HelenusPropertyNode;
import net.helenus.core.reflect.MapExportable;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.value.*;
@ -64,6 +70,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
private final SessionRepository sessionRepository;
private final Executor executor;
private final boolean dropSchemaOnClose;
private final Cache sessionCache;
private final RowColumnValueProvider valueProvider;
private final StatementColumnValuePreparer valuePreparer;
@ -88,6 +95,9 @@ public final class HelenusSession extends AbstractSessionOperations implements C
this.metricRegistry = metricRegistry;
this.zipkinTracer = tracer;
this.sessionCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE)
.expireAfterAccess(MAX_CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).recordStats().build();
this.valueProvider = new RowColumnValueProvider(this.sessionRepository);
this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository);
this.metadata = session.getCluster().getMetadata();
@ -169,7 +179,56 @@ public final class HelenusSession extends AbstractSessionOperations implements C
return defaultQueryIdempotency;
}
public Metadata getMetadata() {
@Override
public void mergeCache(Table<String, String, Object> uowCache) {
List<Object> pojos = uowCache.values().stream().distinct()
.collect(Collectors.toList());
for (Object pojo : pojos) {
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo));
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
if (entity.isCacheable()) {
List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
binder.setValueForProperty(prop, value.toString());
} else {
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
}
});
if (binder.isBound()) {
boundFacets.add(binder.bind());
}
} else {
boundFacets.add(facet);
}
}
String tableName = entity.getName().toCql();
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
Object value = sessionCache.getIfPresent(pojo);
Object mergedValue = null;
for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination);
if (value == null) {
sessionCache.put(cacheKey, pojo);
} else {
if (mergedValue == null) {
mergedValue = pojo;
} else {
mergedValue = CacheUtil.merge(value, pojo);
}
sessionCache.put(mergedValue, pojo);
}
}
}
}
}
public Metadata getMetadata() {
return metadata;
}

View file

@ -58,5 +58,7 @@ public interface UnitOfWork<X extends Exception> extends AutoCloseable {
void cacheUpdate(Object pojo, List<Facet> facets);
Stopwatch getExecutionTimer();
UnitOfWork setPurpose(String purpose);
Stopwatch getExecutionTimer();
}

View file

@ -3,30 +3,46 @@ package net.helenus.core.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class CacheUtil {
public static List<String[]> combinations(String... items) {
int n = items.length;
public static List<String[]> combinations(List<String> items) {
int n = items.size();
if (n > 20 || n < 0) throw new IllegalArgumentException(n + " is out of range");
long e = Math.round(Math.pow(2, n));
List<String[]> out = new ArrayList<String[]>((int) e - 1);
Arrays.sort(items);
for (int k = 1; k <= items.length; k++) {
kcomb(items, 0, k, new String[k], out);
for (int k = 1; k <= items.size(); k++) {
kCombinations(items, 0, k, new String[k], out);
}
return out;
}
private static void kcomb(String[] items, int n, int k, String[] arr, List<String[]> out) {
private static void kCombinations(List<String> items, int n, int k, String[] arr, List<String[]> out) {
if (k == 0) {
out.add(arr.clone());
} else {
for (int i = n; i <= items.length - k; i++) {
arr[arr.length - k] = items[i];
kcomb(items, i + 1, k - 1, arr, out);
for (int i = n; i <= items.size() - k; i++) {
arr[arr.length - k] = items.get(i);
kCombinations(items, i + 1, k - 1, arr, out);
}
}
}
public static List<String[]> flattenFacets(List<Facet> facets) {
Facet table = facets.remove(0);
String tableName = table.value().toString();
List<String[]> combinations = CacheUtil.combinations(facets.stream()
.filter(facet -> facet.value() != null)
.map(facet -> {
return facet.name() + "==" + facet.value();
}).collect(Collectors.toList()));
return combinations;
}
public static Object merge(Object to, Object from) {
return to; // TODO(gburd): yeah...
}
}

View file

@ -91,7 +91,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
cacheResult = checkCache(uow, facets);
if (cacheResult != null) {
result = Optional.of(cacheResult);
}
}
}
if (!result.isPresent()) {

View file

@ -47,7 +47,8 @@ import net.helenus.support.HelenusException;
public abstract class AbstractStatementOperation<E, O extends AbstractStatementOperation<E, O>> extends Operation<E> {
final Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger LOG = LoggerFactory.getLogger(AbstractStatementOperation.class);
protected boolean enableCache = true;
protected boolean showValues = true;
protected TraceContext traceContext;
@ -326,14 +327,14 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
optionalCachedResult = uow.cacheLookup(facets);
if (optionalCachedResult.isPresent()) {
uowCacheHits.mark();
logger.info("UnitOfWork({}) cache hit using facets", uow.hashCode());
LOG.info("UnitOfWork({}) cache hit using facets", uow.hashCode());
result = (E) optionalCachedResult.get();
}
}
if (result == null) {
uowCacheMiss.mark();
logger.info("UnitOfWork({}) cache miss", uow.hashCode());
LOG.info("UnitOfWork({}) cache miss", uow.hashCode());
}
return result;
@ -341,19 +342,19 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
protected void updateCache(UnitOfWork<?> uow, E pojo, List<Facet> identifyingFacets) {
List<Facet> facets = new ArrayList<>();
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
for (Facet facet : identifyingFacets) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
binder.setValueForProperty(prop, value.toString());
} else {
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
}
if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
binder.setValueForProperty(prop, value.toString());
} else {
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
}
facets.add(binder.bind());
});
} else {

View file

@ -68,11 +68,16 @@ public abstract class Operation<E> {
}
Statement statement = options(buildStatement(cached));
Stopwatch timer = uow.getExecutionTimer();
timer.start();
Stopwatch timer = null;
if (uow != null) {
timer = uow.getExecutionTimer();
timer.start();
}
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
ResultSet resultSet = futureResultSet.getUninterruptibly(); //TODO(gburd): (timeout, units);
timer.stop();
if (uow != null) timer.stop();
return resultSet;
} finally {

View file

@ -41,9 +41,13 @@ import net.helenus.mapping.value.ColumnValueProvider;
import net.helenus.mapping.value.ValueProviderMap;
import net.helenus.support.Fun;
import net.helenus.support.HelenusMappingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, SelectOperation<E>> {
private static final Logger LOG = LoggerFactory.getLogger(SelectOperation.class);
protected final List<HelenusPropertyNode> props = new ArrayList<HelenusPropertyNode>();
protected Function<Row, E> rowMapper = null;
protected List<Ordering> ordering = null;
@ -188,10 +192,10 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
Filter filter = filters.get(prop);
if (filter != null) {
Object[] postulates = filter.postulateValues();
@ -277,7 +281,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
}
if (ifFilters != null && !ifFilters.isEmpty()) {
logger.error("onlyIf conditions " + ifFilters + " would be ignored in the statement " + select);
LOG.error("onlyIf conditions " + ifFilters + " would be ignored in the statement " + select);
}
if (allowFiltering) {

View file

@ -69,6 +69,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
try (UnitOfWork uow = session.begin()) {
uow.setPurpose("testSelectAfterSelect");
// This should read from the database and return a Widget.
w1 =
session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null);