WIP: more progress toward session cache.
This commit is contained in:
parent
fc83b8032c
commit
682a1a304e
7 changed files with 41 additions and 28 deletions
|
@ -90,9 +90,11 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
public void logTimers(String what) {
|
||||
double e = (double)elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
|
||||
double d = (double)databaseTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
|
||||
double f = ((double)databaseTime_.elapsed(TimeUnit.NANOSECONDS)) / ((double)elapsedTime_.elapsed(TimeUnit.NANOSECONDS)) * 100.0;
|
||||
LOG.info(String.format("UOW(%s)%s %s (total: %.3fms db: %.3fms or %2.2f%% of total time)",
|
||||
hashCode(), (purpose_ == null ? "" : " " + purpose_), what, e, d, f));
|
||||
double c = (double)cacheLookupTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
|
||||
double fd = (d / (e - c)) * 100.0;
|
||||
double fc = (c / (e - d)) * 100.0;
|
||||
LOG.info(String.format("UOW(%s)%s %s (total: %.3fms cache: %.3fms %2.2f%% db: %.3fms %2.2f%%)",
|
||||
hashCode(), (purpose_ == null ? "" : " " + purpose_), what, e, c, fc, d, fd));
|
||||
}
|
||||
|
||||
private void applyPostCommitFunctions() {
|
||||
|
@ -106,21 +108,22 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
|
||||
@Override
|
||||
public Optional<Object> cacheLookup(List<Facet> facets) {
|
||||
Facet table = facets.remove(0);
|
||||
String tableName = table.value().toString();
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
Optional<Object> result = Optional.empty();
|
||||
for (Facet facet : facets) {
|
||||
String columnName = facet.name() + "==" + facet.value();
|
||||
Object value = cache.get(tableName, columnName);
|
||||
if (value != null) {
|
||||
if (result.isPresent() && result.get() != value) {
|
||||
// One facet matched, but another did not.
|
||||
result = Optional.empty();
|
||||
break;
|
||||
} else {
|
||||
result = Optional.of(value);
|
||||
}
|
||||
}
|
||||
if (!facet.fixed()) {
|
||||
String columnName = facet.name() + "==" + facet.value();
|
||||
Object value = cache.get(tableName, columnName);
|
||||
if (value != null) {
|
||||
if (result.isPresent() && result.get() != value) {
|
||||
// One facet matched, but another did not.
|
||||
result = Optional.empty();
|
||||
break;
|
||||
} else {
|
||||
result = Optional.of(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!result.isPresent()) {
|
||||
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
|
||||
|
|
|
@ -216,8 +216,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
|
|||
boundFacets.add(facet);
|
||||
}
|
||||
}
|
||||
Facet table = boundFacets.remove(0);
|
||||
String tableName = table.value().toString();
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
|
||||
Object value = sessionCache.getIfPresent(pojo);
|
||||
Object mergedValue = null;
|
||||
|
@ -265,9 +264,8 @@ public final class HelenusSession extends AbstractSessionOperations implements C
|
|||
boundFacets.add(facet);
|
||||
}
|
||||
}
|
||||
//String tableName = entity.getName().toCql();
|
||||
Facet table = boundFacets.remove(0);
|
||||
String tableName = table.value().toString();
|
||||
String tableName = entity.getName().toCql();
|
||||
// NOTE: should equal `String tableName = CacheUtil.schemaName(facets);`
|
||||
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
|
||||
Object value = sessionCache.getIfPresent(pojo);
|
||||
Object mergedValue = null;
|
||||
|
|
|
@ -31,6 +31,7 @@ public class CacheUtil {
|
|||
|
||||
public static List<String[]> flattenFacets(List<Facet> facets) {
|
||||
List<String[]> combinations = CacheUtil.combinations(facets.stream()
|
||||
.filter(facet -> !facet.fixed())
|
||||
.filter(facet -> facet.value() != null)
|
||||
.map(facet -> {
|
||||
return facet.name() + "==" + facet.value();
|
||||
|
@ -42,4 +43,10 @@ public class CacheUtil {
|
|||
return to; // TODO(gburd): yeah...
|
||||
}
|
||||
|
||||
public static String schemaName(List<Facet> facets) {
|
||||
return facets.stream().filter(Facet::fixed)
|
||||
.map(facet -> facet.value().toString())
|
||||
.collect(Collectors.joining("."));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package net.helenus.core.cache;
|
|||
public class Facet<T> {
|
||||
private final String name;
|
||||
private T value;
|
||||
private boolean fixed = false;
|
||||
|
||||
public Facet(String name) {
|
||||
this.name = name;
|
||||
|
@ -40,4 +41,8 @@ public class Facet<T> {
|
|||
return value;
|
||||
}
|
||||
|
||||
public Facet setFixed() { fixed = true; return this; }
|
||||
|
||||
public boolean fixed() { return fixed; }
|
||||
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.util.Optional;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.datastax.driver.core.PreparedStatement;
|
||||
|
@ -38,6 +39,7 @@ import com.google.common.util.concurrent.ListenableFuture;
|
|||
|
||||
import net.helenus.core.AbstractSessionOperations;
|
||||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.core.cache.CacheUtil;
|
||||
import net.helenus.core.cache.Facet;
|
||||
|
||||
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
|
||||
|
@ -74,8 +76,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
|
||||
if (enableCache) {
|
||||
List<Facet> facets = bindFacetValues();
|
||||
Facet table = facets.remove(0);
|
||||
String tableName = table.value().toString();
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
cacheResult = (E)sessionOps.checkCache(tableName, facets);
|
||||
if (cacheResult != null) {
|
||||
result = Optional.of(cacheResult);
|
||||
|
@ -122,8 +123,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
result = Optional.of(cacheResult);
|
||||
updateCache = false;
|
||||
} else {
|
||||
Facet table = facets.remove(0);
|
||||
String tableName = table.value().toString();
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
cacheResult = (E)sessionOps.checkCache(tableName, facets);
|
||||
if (cacheResult != null) {
|
||||
result = Optional.of(cacheResult);
|
||||
|
|
|
@ -32,6 +32,7 @@ import com.google.common.util.concurrent.ListenableFuture;
|
|||
|
||||
import net.helenus.core.AbstractSessionOperations;
|
||||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.core.cache.CacheUtil;
|
||||
import net.helenus.core.cache.Facet;
|
||||
|
||||
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
|
||||
|
@ -68,8 +69,7 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
|
||||
if (enableCache) {
|
||||
List<Facet> facets = bindFacetValues();
|
||||
Facet table = facets.remove(0);
|
||||
String tableName = table.value().toString();
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
cacheResult = (E) sessionOps.checkCache(tableName, facets);
|
||||
if (cacheResult != null) {
|
||||
resultStream = Stream.of(cacheResult);
|
||||
|
|
|
@ -113,7 +113,7 @@ public final class HelenusMappingEntity implements HelenusEntity {
|
|||
|
||||
List<HelenusProperty> primaryKeyProperties = new ArrayList<>();
|
||||
ImmutableList.Builder<Facet> facetsBuilder = ImmutableList.builder();
|
||||
facetsBuilder.add(new Facet("table", name.toCql()));
|
||||
facetsBuilder.add(new Facet("table", name.toCql()).setFixed());
|
||||
for (HelenusProperty prop : orderedProps) {
|
||||
switch (prop.getColumnType()) {
|
||||
case PARTITION_KEY :
|
||||
|
|
Loading…
Reference in a new issue