Formatting.

This commit is contained in:
Greg Burd 2017-10-25 20:53:58 -04:00
parent 7535e9ade7
commit e5918cd1e8
9 changed files with 274 additions and 281 deletions

View file

@ -17,10 +17,8 @@ package net.helenus.core;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import net.helenus.support.Either;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -35,6 +33,7 @@ import net.helenus.core.cache.Facet;
import net.helenus.core.operation.Operation; import net.helenus.core.operation.Operation;
import net.helenus.mapping.value.ColumnValuePreparer; import net.helenus.mapping.value.ColumnValuePreparer;
import net.helenus.mapping.value.ColumnValueProvider; import net.helenus.mapping.value.ColumnValueProvider;
import net.helenus.support.Either;
import net.helenus.support.HelenusException; import net.helenus.support.HelenusException;
public abstract class AbstractSessionOperations { public abstract class AbstractSessionOperations {
@ -116,13 +115,13 @@ public abstract class AbstractSessionOperations {
} }
} }
private void logStatement(Statement statement, boolean showValues) { private void logStatement(Statement statement, boolean showValues) {
if (isShowCql()) { if (isShowCql()) {
printCql(Operation.queryString(statement, showValues)); printCql(Operation.queryString(statement, showValues));
} else if (LOG.isInfoEnabled()) { } else if (LOG.isInfoEnabled()) {
LOG.info("CQL> " + Operation.queryString(statement, showValues)); LOG.info("CQL> " + Operation.queryString(statement, showValues));
} }
} }
public Tracer getZipkinTracer() { public Tracer getZipkinTracer() {
return null; return null;
@ -132,7 +131,8 @@ public abstract class AbstractSessionOperations {
return null; return null;
} }
public void mergeCache(Table<String, String, Either<Object, List<Facet>>> uowCache) { } public void mergeCache(Table<String, String, Either<Object, List<Facet>>> uowCache) {
}
RuntimeException translateException(RuntimeException e) { RuntimeException translateException(RuntimeException e) {
if (e instanceof HelenusException) { if (e instanceof HelenusException) {

View file

@ -15,14 +15,12 @@
*/ */
package net.helenus.core; package net.helenus.core;
import static net.helenus.core.HelenusSession.deleted;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import net.helenus.core.cache.UnboundFacet;
import net.helenus.core.reflect.HelenusNamedProperty;
import net.helenus.mapping.HelenusProperty;
import net.helenus.support.Either;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,8 +32,7 @@ import com.google.common.collect.TreeTraverser;
import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet; import net.helenus.core.cache.Facet;
import net.helenus.support.Either;
import static net.helenus.core.HelenusSession.deleted;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */ /** Encapsulates the concept of a "transaction" as a unit-of-work. */
public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfWork<E>, AutoCloseable { public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfWork<E>, AutoCloseable {
@ -45,11 +42,7 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>(); private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>();
private final HelenusSession session; private final HelenusSession session;
private final AbstractUnitOfWork<E> parent; private final AbstractUnitOfWork<E> parent;
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>(); private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create();
private boolean aborted = false;
private boolean committed = false;
private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create();
protected String purpose; protected String purpose;
protected int cacheHits = 0; protected int cacheHits = 0;
protected int cacheMisses = 0; protected int cacheMisses = 0;
@ -57,6 +50,9 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
protected Stopwatch elapsedTime; protected Stopwatch elapsedTime;
protected Map<String, Double> databaseTime = new HashMap<>(); protected Map<String, Double> databaseTime = new HashMap<>();
protected double cacheLookupTime = 0.0; protected double cacheLookupTime = 0.0;
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
private boolean aborted = false;
private boolean committed = false;
protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork<E> parent) { protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork<E> parent) {
Objects.requireNonNull(session, "containing session cannot be null"); Objects.requireNonNull(session, "containing session cannot be null");
@ -168,10 +164,10 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
String columnName = facet.name() + "==" + facet.value(); String columnName = facet.name() + "==" + facet.value();
Either<Object, List<Facet>> eitherValue = cache.get(tableName, columnName); Either<Object, List<Facet>> eitherValue = cache.get(tableName, columnName);
if (eitherValue != null) { if (eitherValue != null) {
Object value = deleted; Object value = deleted;
if (eitherValue.isLeft()) { if (eitherValue.isLeft()) {
value = eitherValue.getLeft(); value = eitherValue.getLeft();
} }
result = Optional.of(value); result = Optional.of(value);
break; break;
} }
@ -188,44 +184,44 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
@Override @Override
public List<Facet> cacheEvict(List<Facet> facets) { public List<Facet> cacheEvict(List<Facet> facets) {
Either<Object, List<Facet>> deletedObjectFacets = Either.right(facets); Either<Object, List<Facet>> deletedObjectFacets = Either.right(facets);
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
Optional<Object> optionalValue = cacheLookup(facets); Optional<Object> optionalValue = cacheLookup(facets);
if (optionalValue.isPresent()) { if (optionalValue.isPresent()) {
Object value = optionalValue.get(); Object value = optionalValue.get();
for (Facet facet : facets) { for (Facet facet : facets) {
if (!facet.fixed()) { if (!facet.fixed()) {
String columnKey = facet.name() + "==" + facet.value(); String columnKey = facet.name() + "==" + facet.value();
// mark the value identified by the facet to `deleted` // mark the value identified by the facet to `deleted`
cache.put(tableName, columnKey, deletedObjectFacets); cache.put(tableName, columnKey, deletedObjectFacets);
} }
} }
// look for other row/col pairs that referenced the same object, mark them // look for other row/col pairs that referenced the same object, mark them
// `deleted` // `deleted`
cache.columnKeySet().forEach(columnKey -> { cache.columnKeySet().forEach(columnKey -> {
Either<Object, List<Facet>> eitherCachedValue = cache.get(tableName, columnKey); Either<Object, List<Facet>> eitherCachedValue = cache.get(tableName, columnKey);
if (eitherCachedValue.isLeft()) { if (eitherCachedValue.isLeft()) {
Object cachedValue = eitherCachedValue.getLeft(); Object cachedValue = eitherCachedValue.getLeft();
if (cachedValue == value) { if (cachedValue == value) {
cache.put(tableName, columnKey, deletedObjectFacets); cache.put(tableName, columnKey, deletedObjectFacets);
String[] parts = columnKey.split("=="); String[] parts = columnKey.split("==");
facets.add(new Facet<String>(parts[0], parts[1])); facets.add(new Facet<String>(parts[0], parts[1]));
} }
} }
}); });
} }
return facets; return facets;
} }
@Override @Override
public void cacheUpdate(Object value, List<Facet> facets) { public void cacheUpdate(Object value, List<Facet> facets) {
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
for (Facet facet : facets) { for (Facet facet : facets) {
if (!facet.fixed()) { if (!facet.fixed()) {
String columnName = facet.name() + "==" + facet.value(); String columnName = facet.name() + "==" + facet.value();
cache.put(tableName, columnName, Either.left(value)); cache.put(tableName, columnName, Either.left(value));
} }
} }
} }
@ -265,42 +261,42 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
aborted = false; aborted = false;
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit)); nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
elapsedTime.stop(); elapsedTime.stop();
if (parent == null) { if (parent == null) {
// Apply all post-commit functions, this is the outter-most UnitOfWork. // Apply all post-commit functions, this is the outter-most UnitOfWork.
traverser.postOrderTraversal(this).forEach(uow -> { traverser.postOrderTraversal(this).forEach(uow -> {
uow.applyPostCommitFunctions(); uow.applyPostCommitFunctions();
}); });
// Merge our cache into the session cache. // Merge our cache into the session cache.
session.mergeCache(cache); session.mergeCache(cache);
return new PostCommitFunction(this, null); return new PostCommitFunction(this, null);
} else { } else {
// Merge cache and statistics into parent if there is one. // Merge cache and statistics into parent if there is one.
parent.mergeCache(cache); parent.mergeCache(cache);
parent.cacheHits += cacheHits; parent.cacheHits += cacheHits;
parent.cacheMisses += cacheMisses; parent.cacheMisses += cacheMisses;
parent.databaseLookups += databaseLookups; parent.databaseLookups += databaseLookups;
parent.cacheLookupTime += cacheLookupTime; parent.cacheLookupTime += cacheLookupTime;
for (String name : databaseTime.keySet()) { for (String name : databaseTime.keySet()) {
if (parent.databaseTime.containsKey(name)) { if (parent.databaseTime.containsKey(name)) {
double t = parent.databaseTime.get(name); double t = parent.databaseTime.get(name);
parent.databaseTime.put(name, t + databaseTime.get(name)); parent.databaseTime.put(name, t + databaseTime.get(name));
} else { } else {
parent.databaseTime.put(name, databaseTime.get(name)); parent.databaseTime.put(name, databaseTime.get(name));
} }
} }
} }
} }
// else { // else {
// Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass); // Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass);
// T object = ctor.newInstance(new Object[] { String message }); // T object = ctor.newInstance(new Object[] { String message });
// } // }
return new PostCommitFunction(this, postCommit); return new PostCommitFunction(this, postCommit);
} }
/* Explicitly discard the work and mark it as as such in the log. */ /* Explicitly discard the work and mark it as as such in the log. */
@ -325,8 +321,9 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
from.rowMap().forEach((rowKey, columnMap) -> { from.rowMap().forEach((rowKey, columnMap) -> {
columnMap.forEach((columnKey, value) -> { columnMap.forEach((columnKey, value) -> {
if (to.contains(rowKey, columnKey)) { if (to.contains(rowKey, columnKey)) {
//TODO(gburd):... // TODO(gburd):...
to.put(rowKey, columnKey, Either.left(CacheUtil.merge(to.get(rowKey, columnKey).getLeft(), from.get(rowKey, columnKey).getLeft()))); to.put(rowKey, columnKey, Either.left(CacheUtil.merge(to.get(rowKey, columnKey).getLeft(),
from.get(rowKey, columnKey).getLeft())));
} else { } else {
to.put(rowKey, columnKey, from.get(rowKey, columnKey)); to.put(rowKey, columnKey, from.get(rowKey, columnKey));
} }

View file

@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import net.helenus.support.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -49,15 +48,15 @@ import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusProperty; import net.helenus.mapping.HelenusProperty;
import net.helenus.mapping.MappingUtil; import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.value.*; import net.helenus.mapping.value.*;
import net.helenus.support.*;
import net.helenus.support.Fun.Tuple1; import net.helenus.support.Fun.Tuple1;
import net.helenus.support.Fun.Tuple2; import net.helenus.support.Fun.Tuple2;
import net.helenus.support.Fun.Tuple6; import net.helenus.support.Fun.Tuple6;
public final class HelenusSession extends AbstractSessionOperations implements Closeable { public final class HelenusSession extends AbstractSessionOperations implements Closeable {
public static final Object deleted = new Object();
private static final Logger LOG = LoggerFactory.getLogger(HelenusSession.class); private static final Logger LOG = LoggerFactory.getLogger(HelenusSession.class);
public static final Object deleted = new Object();
private final int MAX_CACHE_SIZE = 10000; private final int MAX_CACHE_SIZE = 10000;
private final int MAX_CACHE_EXPIRE_SECONDS = 600; private final int MAX_CACHE_EXPIRE_SECONDS = 600;
@ -234,62 +233,62 @@ public final class HelenusSession extends AbstractSessionOperations implements C
boundFacets.add(facet); boundFacets.add(facet);
} }
} }
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets); List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
mergeAndUpdateCacheValues(pojo, tableName, facetCombinations); mergeAndUpdateCacheValues(pojo, tableName, facetCombinations);
} }
@Override @Override
public void mergeCache(Table<String, String, Either<Object, List<Facet>>> uowCache) { public void mergeCache(Table<String, String, Either<Object, List<Facet>>> uowCache) {
List<Either<Object, List<Facet>>> items = uowCache.values().stream().distinct().collect(Collectors.toList()); List<Either<Object, List<Facet>>> items = uowCache.values().stream().distinct().collect(Collectors.toList());
for (Either<Object, List<Facet>> item : items) { for (Either<Object, List<Facet>> item : items) {
if (item.isRight()) { if (item.isRight()) {
List<Facet> facets = item.getRight(); List<Facet> facets = item.getRight();
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
List<String[]> combinations = CacheUtil.flattenFacets(facets); List<String[]> combinations = CacheUtil.flattenFacets(facets);
for (String[] combination : combinations) { for (String[] combination : combinations) {
String cacheKey = tableName + "." + Arrays.toString(combination); String cacheKey = tableName + "." + Arrays.toString(combination);
sessionCache.invalidate(cacheKey); sessionCache.invalidate(cacheKey);
} }
} else { } else {
Object pojo = item.getLeft(); Object pojo = item.getLeft();
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo)); HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo));
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
if (entity.isCacheable()) { if (entity.isCacheable()) {
List<Facet> boundFacets = new ArrayList<>(); List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : entity.getFacets()) { for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) { if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet; UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder(); UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> { unboundFacet.getProperties().forEach(prop -> {
if (valueMap == null) { if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop,
false); false);
binder.setValueForProperty(prop, value.toString()); binder.setValueForProperty(prop, value.toString());
} else { } else {
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString()); binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
} }
}); });
if (binder.isBound()) { if (binder.isBound()) {
boundFacets.add(binder.bind()); boundFacets.add(binder.bind());
} }
} else { } else {
boundFacets.add(facet); boundFacets.add(facet);
} }
} }
// NOTE: should equal `String tableName = CacheUtil.schemaName(facets);` // NOTE: should equal `String tableName = CacheUtil.schemaName(facets);`
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets); List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
String tableName = CacheUtil.schemaName(boundFacets); String tableName = CacheUtil.schemaName(boundFacets);
mergeAndUpdateCacheValues(pojo, tableName, facetCombinations); mergeAndUpdateCacheValues(pojo, tableName, facetCombinations);
} }
} }
} }
} }
private void mergeAndUpdateCacheValues(Object pojo, String tableName, List<String[]> facetCombinations) { private void mergeAndUpdateCacheValues(Object pojo, String tableName, List<String[]> facetCombinations) {
Object merged = null; Object merged = null;
for (String[] combination : facetCombinations) { for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination); String cacheKey = tableName + "." + Arrays.toString(combination);
Object value = sessionCache.getIfPresent(cacheKey); Object value = sessionCache.getIfPresent(cacheKey);
if (value == null) { if (value == null) {
sessionCache.put(cacheKey, pojo); sessionCache.put(cacheKey, pojo);

View file

@ -15,6 +15,8 @@
*/ */
package net.helenus.core.operation; package net.helenus.core.operation;
import static net.helenus.core.HelenusSession.deleted;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -32,8 +34,6 @@ import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet; import net.helenus.core.cache.Facet;
import static net.helenus.core.HelenusSession.deleted;
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>> public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
extends extends
AbstractStatementOperation<E, O> { AbstractStatementOperation<E, O> {
@ -118,41 +118,41 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
try { try {
List<Facet> facets = bindFacetValues(); List<Facet> facets = bindFacetValues();
if (facets != null) { if (facets != null) {
cachedResult = checkCache(uow, facets); cachedResult = checkCache(uow, facets);
if (cachedResult != null) { if (cachedResult != null) {
updateCache = false; updateCache = false;
result = Optional.of(cachedResult); result = Optional.of(cachedResult);
uowCacheHits.mark(); uowCacheHits.mark();
cacheHits.mark(); cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0); uow.recordCacheAndDatabaseOperationCount(1, 0);
} else { } else {
updateCache = true; updateCache = true;
uowCacheMiss.mark(); uowCacheMiss.mark();
if (isSessionCacheable()) { if (isSessionCacheable()) {
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
cachedResult = (E) sessionOps.checkCache(tableName, facets); cachedResult = (E) sessionOps.checkCache(tableName, facets);
if (cachedResult != null) { if (cachedResult != null) {
result = Optional.of(cachedResult); result = Optional.of(cachedResult);
sessionCacheHits.mark(); sessionCacheHits.mark();
cacheHits.mark(); cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0); uow.recordCacheAndDatabaseOperationCount(1, 0);
} else { } else {
sessionCacheMiss.mark(); sessionCacheMiss.mark();
cacheMiss.mark(); cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0); uow.recordCacheAndDatabaseOperationCount(-1, 0);
} }
} }
} }
} else { } else {
updateCache = false; updateCache = false;
} }
} finally { } finally {
timer.stop(); timer.stop();
uow.addCacheLookupTime(timer); uow.addCacheLookupTime(timer);
} }
} else { } else {
updateCache = false; updateCache = false;
} }
if (!result.isPresent()) { if (!result.isPresent()) {
// Formulate the query and execute it against the Cassandra cluster. // Formulate the query and execute it against the Cassandra cluster.
@ -163,16 +163,16 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
result = transform(resultSet); result = transform(resultSet);
} }
if (result.get() == deleted) { if (result.get() == deleted) {
return Optional.empty(); return Optional.empty();
} else { } else {
// If we have a result, it wasn't from the UOW cache, and we're caching things // If we have a result, it wasn't from the UOW cache, and we're caching things
// then we need to put this result into the cache for future requests to find. // then we need to put this result into the cache for future requests to find.
if (updateCache && result.isPresent()) { if (updateCache && result.isPresent()) {
cacheUpdate(uow, result.get(), getFacets()); cacheUpdate(uow, result.get(), getFacets());
} }
return result; return result;
} }
} finally { } finally {
context.stop(); context.stop();
} }

View file

@ -15,6 +15,8 @@
*/ */
package net.helenus.core.operation; package net.helenus.core.operation;
import static net.helenus.core.HelenusSession.deleted;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -33,8 +35,6 @@ import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet; import net.helenus.core.cache.Facet;
import static net.helenus.core.HelenusSession.deleted;
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>> public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
extends extends
AbstractStatementOperation<E, O> { AbstractStatementOperation<E, O> {
@ -124,41 +124,41 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
try { try {
List<Facet> facets = bindFacetValues(); List<Facet> facets = bindFacetValues();
if (facets != null) { if (facets != null) {
cachedResult = checkCache(uow, facets); cachedResult = checkCache(uow, facets);
if (cachedResult != null) { if (cachedResult != null) {
updateCache = false; updateCache = false;
resultStream = Stream.of(cachedResult); resultStream = Stream.of(cachedResult);
uowCacheHits.mark(); uowCacheHits.mark();
cacheHits.mark(); cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0); uow.recordCacheAndDatabaseOperationCount(1, 0);
} else { } else {
updateCache = true; updateCache = true;
uowCacheMiss.mark(); uowCacheMiss.mark();
if (isSessionCacheable()) { if (isSessionCacheable()) {
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
cachedResult = (E) sessionOps.checkCache(tableName, facets); cachedResult = (E) sessionOps.checkCache(tableName, facets);
if (cachedResult != null) { if (cachedResult != null) {
resultStream = Stream.of(cachedResult); resultStream = Stream.of(cachedResult);
sessionCacheHits.mark(); sessionCacheHits.mark();
cacheHits.mark(); cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0); uow.recordCacheAndDatabaseOperationCount(1, 0);
} else { } else {
sessionCacheMiss.mark(); sessionCacheMiss.mark();
cacheMiss.mark(); cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0); uow.recordCacheAndDatabaseOperationCount(-1, 0);
} }
} }
} }
} else { } else {
updateCache = false; updateCache = false;
} }
} finally { } finally {
timer.stop(); timer.stop();
uow.addCacheLookupTime(timer); uow.addCacheLookupTime(timer);
} }
} else { } else {
updateCache = false; updateCache = false;
} }
if (resultStream == null) { if (resultStream == null) {
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
@ -169,19 +169,19 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
// If we have a result and we're caching then we need to put it into the cache // If we have a result and we're caching then we need to put it into the cache
// for future requests to find. // for future requests to find.
if (resultStream != null) { if (resultStream != null) {
List<E> again = new ArrayList<>(); List<E> again = new ArrayList<>();
List<Facet> facets = getFacets(); List<Facet> facets = getFacets();
resultStream.forEach(result -> { resultStream.forEach(result -> {
if (result != deleted) { if (result != deleted) {
if (updateCache) { if (updateCache) {
cacheUpdate(uow, result, facets); cacheUpdate(uow, result, facets);
} }
again.add(result); again.add(result);
} }
}); });
resultStream = again.stream(); resultStream = again.stream();
} }
return resultStream; return resultStream;
} finally { } finally {
context.stop(); context.stop();
} }

View file

@ -133,9 +133,9 @@ public final class DeleteOperation extends AbstractFilterOperation<ResultSet, De
} }
public List<Facet> bindFacetValues(List<Facet> facets) { public List<Facet> bindFacetValues(List<Facet> facets) {
if (facets == null) { if (facets == null) {
return new ArrayList<Facet>(); return new ArrayList<Facet>();
} }
List<Facet> boundFacets = new ArrayList<>(); List<Facet> boundFacets = new ArrayList<>();
Map<HelenusProperty, Filter> filterMap = new HashMap<>(filters.size()); Map<HelenusProperty, Filter> filterMap = new HashMap<>(filters.size());
filters.forEach(f -> filterMap.put(f.getNode().getProperty(), f)); filters.forEach(f -> filterMap.put(f.getNode().getProperty(), f));
@ -177,9 +177,9 @@ public final class DeleteOperation extends AbstractFilterOperation<ResultSet, De
} }
@Override @Override
public List<Facet> getFacets() { public List<Facet> getFacets() {
return entity.getFacets(); return entity.getFacets();
} }
@Override @Override
public ResultSet sync(UnitOfWork uow) {// throws TimeoutException { public ResultSet sync(UnitOfWork uow) {// throws TimeoutException {
@ -187,8 +187,8 @@ public final class DeleteOperation extends AbstractFilterOperation<ResultSet, De
return sync(); return sync();
} }
ResultSet result = super.sync(uow); ResultSet result = super.sync(uow);
List<Facet> facets = getFacets(); List<Facet> facets = getFacets();
uow.cacheEvict(bindFacetValues(facets)); uow.cacheEvict(bindFacetValues(facets));
return result; return result;
} }

View file

@ -19,17 +19,17 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement; import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import brave.Span; import brave.Span;
@ -53,19 +53,39 @@ public abstract class Operation<E> {
protected final Timer requestLatency; protected final Timer requestLatency;
Operation(AbstractSessionOperations sessionOperations) { Operation(AbstractSessionOperations sessionOperations) {
this.sessionOps = sessionOperations; this.sessionOps = sessionOperations;
MetricRegistry metrics = sessionOperations.getMetricRegistry(); MetricRegistry metrics = sessionOperations.getMetricRegistry();
if (metrics == null) { if (metrics == null) {
metrics = new MetricRegistry(); metrics = new MetricRegistry();
} }
this.uowCacheHits = metrics.meter("net.helenus.UOW-cache-hits"); this.uowCacheHits = metrics.meter("net.helenus.UOW-cache-hits");
this.uowCacheMiss = metrics.meter("net.helenus.UOW-cache-miss"); this.uowCacheMiss = metrics.meter("net.helenus.UOW-cache-miss");
this.sessionCacheHits = metrics.meter("net.helenus.session-cache-hits"); this.sessionCacheHits = metrics.meter("net.helenus.session-cache-hits");
this.sessionCacheMiss = metrics.meter("net.helenus.session-cache-miss"); this.sessionCacheMiss = metrics.meter("net.helenus.session-cache-miss");
this.cacheHits = metrics.meter("net.helenus.cache-hits"); this.cacheHits = metrics.meter("net.helenus.cache-hits");
this.cacheMiss = metrics.meter("net.helenus.cache-miss"); this.cacheMiss = metrics.meter("net.helenus.cache-miss");
this.requestLatency = metrics.timer("net.helenus.request-latency"); this.requestLatency = metrics.timer("net.helenus.request-latency");
} }
public static String queryString(Statement statement, boolean includeValues) {
String query = null;
if (statement instanceof BuiltStatement) {
BuiltStatement builtStatement = (BuiltStatement) statement;
if (includeValues) {
RegularStatement regularStatement = builtStatement.setForceNoValues(true);
query = regularStatement.getQueryString();
} else {
query = builtStatement.getQueryString();
}
} else if (statement instanceof RegularStatement) {
RegularStatement regularStatement = (RegularStatement) statement;
query = regularStatement.getQueryString();
} else {
query = statement.toString();
}
return query;
}
public ResultSet execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, long timeout, public ResultSet execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, long timeout,
TimeUnit units, boolean showValues, boolean cached) { // throws TimeoutException { TimeUnit units, boolean showValues, boolean cached) { // throws TimeoutException {
@ -109,40 +129,17 @@ public abstract class Operation<E> {
} }
} }
public static String queryString(Statement statement, boolean includeValues) { void log(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {
String query = null;
if (statement instanceof BuiltStatement) {
BuiltStatement builtStatement = (BuiltStatement) statement;
if (includeValues) {
RegularStatement regularStatement = builtStatement.setForceNoValues(true);
query = regularStatement.getQueryString();
} else {
query = builtStatement.getQueryString();
}
} else if (statement instanceof RegularStatement) {
RegularStatement regularStatement = (RegularStatement) statement;
query = regularStatement.getQueryString();
} else {
query = statement.toString();
}
return query;
}
void log(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
String uowString = ""; String uowString = "";
if (uow != null) { if (uow != null) {
uowString = "UOW(" + uow.hashCode() + ")"; uowString = "UOW(" + uow.hashCode() + ")";
} }
String timerString = ""; String timerString = "";
if (timer != null) { if (timer != null) {
timerString = String.format(" %s ", timer.toString()); timerString = String.format(" %s ", timer.toString());
} }
LOG.info(String.format("%s%s%s", LOG.info(String.format("%s%s%s", uowString, timerString, Operation.queryString(statement, false)));
uowString,
timerString,
Operation.queryString(statement, false)));
} }
} }

View file

@ -250,7 +250,8 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
+ entity.getMappingInterface() + " or " + prop.getEntity().getMappingInterface()); + entity.getMappingInterface() + " or " + prop.getEntity().getMappingInterface());
} }
//TODO(gburd): writeTime and ttl will be useful on merge() but cause object identity to fail. // TODO(gburd): writeTime and ttl will be useful on merge() but cause object
// identity to fail.
if (false && cached) { if (false && cached) {
switch (prop.getProperty().getColumnType()) { switch (prop.getProperty().getColumnType()) {
case PARTITION_KEY : case PARTITION_KEY :

View file

@ -64,37 +64,36 @@ public class DslInvocationHandler<E> implements InvocationHandler {
private HelenusEntity init(Metadata metadata) { private HelenusEntity init(Metadata metadata) {
HelenusEntity entity = new HelenusMappingEntity(iface, metadata); HelenusEntity entity = new HelenusMappingEntity(iface, metadata);
Collection<HelenusProperty> properties = entity.getOrderedProperties(); Collection<HelenusProperty> properties = entity.getOrderedProperties();
if (properties != null) { if (properties != null) {
for (HelenusProperty prop : properties) { for (HelenusProperty prop : properties) {
map.put(prop.getGetterMethod(), prop); map.put(prop.getGetterMethod(), prop);
AbstractDataType type = prop.getDataType(); AbstractDataType type = prop.getDataType();
Class<?> javaType = prop.getJavaType(); Class<?> javaType = prop.getJavaType();
if (type instanceof UDTDataType && !UDTValue.class.isAssignableFrom(javaType)) { if (type instanceof UDTDataType && !UDTValue.class.isAssignableFrom(javaType)) {
Object childDsl = Helenus.dsl(javaType, classLoader, Object childDsl = Helenus.dsl(javaType, classLoader,
Optional.of(new HelenusPropertyNode(prop, parent)), Optional.of(new HelenusPropertyNode(prop, parent)), metadata);
metadata);
udtMap.put(prop.getGetterMethod(), childDsl); udtMap.put(prop.getGetterMethod(), childDsl);
} }
if (type instanceof DTDataType) { if (type instanceof DTDataType) {
DTDataType dataType = (DTDataType) type; DTDataType dataType = (DTDataType) type;
if (dataType.getDataType() instanceof TupleType && !TupleValue.class.isAssignableFrom(javaType)) { if (dataType.getDataType() instanceof TupleType && !TupleValue.class.isAssignableFrom(javaType)) {
Object childDsl = Helenus.dsl(javaType, classLoader, Object childDsl = Helenus.dsl(javaType, classLoader,
Optional.of(new HelenusPropertyNode(prop, parent)), metadata); Optional.of(new HelenusPropertyNode(prop, parent)), metadata);
tupleMap.put(prop.getGetterMethod(), childDsl); tupleMap.put(prop.getGetterMethod(), childDsl);
} }
} }
} }
} }
return entity; return entity;
} }