Formatting.

This commit is contained in:
Greg Burd 2017-10-23 14:41:20 -04:00
parent d25061366b
commit 7c683acb56
20 changed files with 435 additions and 433 deletions

View file

@ -19,25 +19,24 @@ import java.io.PrintStream;
import java.util.List;
import java.util.concurrent.Executor;
import com.google.common.cache.Cache;
import com.google.common.collect.Table;
import net.helenus.core.cache.Facet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.ListenableFuture;
import brave.Tracer;
import net.helenus.core.cache.Facet;
import net.helenus.mapping.value.ColumnValuePreparer;
import net.helenus.mapping.value.ColumnValueProvider;
import net.helenus.support.HelenusException;
public abstract class AbstractSessionOperations {
private static final Logger LOG = LoggerFactory.getLogger(AbstractSessionOperations.class);
private static final Logger LOG = LoggerFactory.getLogger(AbstractSessionOperations.class);
public abstract Session currentSession();
@ -120,7 +119,8 @@ public abstract class AbstractSessionOperations {
return null;
}
public void mergeCache(Table<String, String, Object> cache) {}
public void mergeCache(Table<String, String, Object> cache) {
}
RuntimeException translateException(RuntimeException e) {
if (e instanceof HelenusException) {
@ -129,9 +129,12 @@ public abstract class AbstractSessionOperations {
throw new HelenusException(e);
}
public Object checkCache(String tableName, List<Facet> facets) { return null; }
public Object checkCache(String tableName, List<Facet> facets) {
return null;
}
public void updateCache(Object pojo, List<Facet> facets) { }
public void updateCache(Object pojo, List<Facet> facets) {
}
void printCql(String cql) {
getPrintStream().println(cql);

View file

@ -18,6 +18,9 @@ package net.helenus.core;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.diffplug.common.base.Errors;
import com.google.common.base.Stopwatch;
import com.google.common.collect.HashBasedTable;
@ -26,30 +29,25 @@ import com.google.common.collect.TreeTraverser;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfWork<E>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractUnitOfWork.class);
private static final Logger LOG = LoggerFactory.getLogger(AbstractUnitOfWork.class);
private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>();
private final HelenusSession session;
private final AbstractUnitOfWork<E> parent;
// Cache:
private final Table<String, String, Object> cache = HashBasedTable.create();
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
private boolean aborted = false;
private boolean committed = false;
private String purpose_;
private Stopwatch elapsedTime_;
private Stopwatch databaseTime_ = Stopwatch.createUnstarted();
private Stopwatch cacheLookupTime_ = Stopwatch.createUnstarted();
// Cache:
private final Table<String, String, Object> cache = HashBasedTable.create();
protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork<E> parent) {
Objects.requireNonNull(session, "containing session cannot be null");
@ -57,17 +55,17 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
this.parent = parent;
}
@Override
public Stopwatch getExecutionTimer() {
return databaseTime_;
}
@Override
public Stopwatch getExecutionTimer() {
return databaseTime_;
}
@Override
public Stopwatch getCacheLookupTimer() {
return cacheLookupTime_;
}
@Override
public Stopwatch getCacheLookupTimer() {
return cacheLookupTime_;
}
@Override
@Override
public void addNestedUnitOfWork(UnitOfWork<E> uow) {
synchronized (nested) {
nested.add((AbstractUnitOfWork<E>) uow);
@ -82,54 +80,54 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
}
@Override
public UnitOfWork setPurpose(String purpose) {
purpose_ = purpose;
return this;
}
public UnitOfWork setPurpose(String purpose) {
purpose_ = purpose;
return this;
}
public void logTimers(String what) {
double e = (double)elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
double d = (double)databaseTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
double c = (double)cacheLookupTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
double fd = (d / (e - c)) * 100.0;
double fc = (c / (e - d)) * 100.0;
LOG.info(String.format("UOW(%s)%s %s (total: %.3fms cache: %.3fms %2.2f%% db: %.3fms %2.2f%%)",
hashCode(), (purpose_ == null ? "" : " " + purpose_), what, e, c, fc, d, fd));
}
public void logTimers(String what) {
double e = (double) elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
double d = (double) databaseTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
double c = (double) cacheLookupTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
double fd = (d / (e - c)) * 100.0;
double fc = (c / (e - d)) * 100.0;
LOG.info(String.format("UOW(%s)%s %s (total: %.3fms cache: %.3fms %2.2f%% db: %.3fms %2.2f%%)", hashCode(),
(purpose_ == null ? "" : " " + purpose_), what, e, c, fc, d, fd));
}
private void applyPostCommitFunctions() {
private void applyPostCommitFunctions() {
if (!postCommit.isEmpty()) {
for (CommitThunk f : postCommit) {
f.apply();
}
}
logTimers("committed");
logTimers("committed");
}
@Override
public Optional<Object> cacheLookup(List<Facet> facets) {
String tableName = CacheUtil.schemaName(facets);
String tableName = CacheUtil.schemaName(facets);
Optional<Object> result = Optional.empty();
for (Facet facet : facets) {
if (!facet.fixed()) {
String columnName = facet.name() + "==" + facet.value();
Object value = cache.get(tableName, columnName);
if (value != null) {
if (result.isPresent() && result.get() != value) {
// One facet matched, but another did not.
result = Optional.empty();
break;
} else {
result = Optional.of(value);
}
}
}
if (!facet.fixed()) {
String columnName = facet.name() + "==" + facet.value();
Object value = cache.get(tableName, columnName);
if (value != null) {
if (result.isPresent() && result.get() != value) {
// One facet matched, but another did not.
result = Optional.empty();
break;
} else {
result = Optional.of(value);
}
}
}
}
if (!result.isPresent()) {
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
if (parent != null) {
return parent.cacheLookup(facets);
}
}
}
return result;
}
@ -185,9 +183,9 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
if (parent != null) {
parent.mergeCache(cache);
} else {
session.mergeCache(cache);
}
elapsedTime_.stop();
session.mergeCache(cache);
}
elapsedTime_.stop();
// Apply all post-commit functions for
if (parent == null) {
@ -213,23 +211,23 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
});
// log.record(txn::abort)
// cache.invalidateSince(txn::start time)
if (!hasAborted()) {
elapsedTime_.stop();
logTimers("aborted");
}
if (!hasAborted()) {
elapsedTime_.stop();
logTimers("aborted");
}
}
private void mergeCache(Table<String, String, Object> from) {
Table<String, String, Object> to = this.cache;
from.rowMap().forEach((rowKey, columnMap) -> {
columnMap.forEach((columnKey, value) -> {
if (to.contains(rowKey, columnKey)) {
to.put(rowKey, columnKey, CacheUtil.merge(to.get(rowKey, columnKey), from.get(rowKey, columnKey)));
} else {
to.put(rowKey, columnKey, from.get(rowKey, columnKey));
}
});
});
Table<String, String, Object> to = this.cache;
from.rowMap().forEach((rowKey, columnMap) -> {
columnMap.forEach((columnKey, value) -> {
if (to.contains(rowKey, columnKey)) {
to.put(rowKey, columnKey, CacheUtil.merge(to.get(rowKey, columnKey), from.get(rowKey, columnKey)));
} else {
to.put(rowKey, columnKey, from.get(rowKey, columnKey));
}
});
});
}
public String describeConflicts() {

View file

@ -185,8 +185,8 @@ public final class Helenus {
}
if (metadata != null) {
metadataForEntity.putIfAbsent(iface, metadata);
}
metadataForEntity.putIfAbsent(iface, metadata);
}
return entity(iface, metadata);
}

View file

@ -29,11 +29,11 @@ import java.util.stream.Collectors;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*;
import brave.Tracer;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Table;
import brave.Tracer;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.core.cache.UnboundFacet;
@ -95,7 +95,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
this.metricRegistry = metricRegistry;
this.zipkinTracer = tracer;
this.sessionCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE)
this.sessionCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE)
.expireAfterAccess(MAX_CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).recordStats().build();
this.valueProvider = new RowColumnValueProvider(this.sessionRepository);
@ -180,113 +180,112 @@ public final class HelenusSession extends AbstractSessionOperations implements C
}
@Override
public Object checkCache(String tableName, List<Facet> facets) {
List<String[]> facetCombinations = CacheUtil.flattenFacets(facets);
Object result = null;
for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination);
result = sessionCache.getIfPresent(cacheKey);
if (result != null) {
return result;
}
}
return null;
public Object checkCache(String tableName, List<Facet> facets) {
List<String[]> facetCombinations = CacheUtil.flattenFacets(facets);
Object result = null;
for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination);
result = sessionCache.getIfPresent(cacheKey);
if (result != null) {
return result;
}
}
return null;
}
@Override
public void updateCache(Object pojo, List<Facet> facets) {
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : facets) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
binder.setValueForProperty(prop, value.toString());
} else {
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
}
});
if (binder.isBound()) {
boundFacets.add(binder.bind());
}
} else {
boundFacets.add(facet);
}
}
String tableName = CacheUtil.schemaName(facets);
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
Object value = sessionCache.getIfPresent(pojo);
Object mergedValue = null;
for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination);
if (value == null) {
sessionCache.put(cacheKey, pojo);
} else {
if (mergedValue == null) {
mergedValue = pojo;
} else {
mergedValue = CacheUtil.merge(value, pojo);
}
sessionCache.put(mergedValue, pojo);
}
}
public void updateCache(Object pojo, List<Facet> facets) {
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : facets) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
binder.setValueForProperty(prop, value.toString());
} else {
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
}
});
if (binder.isBound()) {
boundFacets.add(binder.bind());
}
} else {
boundFacets.add(facet);
}
}
String tableName = CacheUtil.schemaName(facets);
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
Object value = sessionCache.getIfPresent(pojo);
Object mergedValue = null;
for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination);
if (value == null) {
sessionCache.put(cacheKey, pojo);
} else {
if (mergedValue == null) {
mergedValue = pojo;
} else {
mergedValue = CacheUtil.merge(value, pojo);
}
sessionCache.put(mergedValue, pojo);
}
}
}
}
@Override
public void mergeCache(Table<String, String, Object> uowCache) {
List<Object> pojos = uowCache.values().stream().distinct()
.collect(Collectors.toList());
for (Object pojo : pojos) {
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo));
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
if (entity.isCacheable()) {
List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
binder.setValueForProperty(prop, value.toString());
} else {
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
}
});
if (binder.isBound()) {
boundFacets.add(binder.bind());
}
} else {
boundFacets.add(facet);
}
}
String tableName = entity.getName().toCql();
// NOTE: should equal `String tableName = CacheUtil.schemaName(facets);`
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
Object value = sessionCache.getIfPresent(pojo);
Object mergedValue = null;
for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination);
if (value == null) {
sessionCache.put(cacheKey, pojo);
} else {
if (mergedValue == null) {
mergedValue = pojo;
} else {
mergedValue = CacheUtil.merge(value, pojo);
}
sessionCache.put(mergedValue, pojo);
}
}
}
}
}
@Override
public void mergeCache(Table<String, String, Object> uowCache) {
List<Object> pojos = uowCache.values().stream().distinct().collect(Collectors.toList());
for (Object pojo : pojos) {
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo));
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
if (entity.isCacheable()) {
List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
binder.setValueForProperty(prop, value.toString());
} else {
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
}
});
if (binder.isBound()) {
boundFacets.add(binder.bind());
}
} else {
boundFacets.add(facet);
}
}
String tableName = entity.getName().toCql();
// NOTE: should equal `String tableName = CacheUtil.schemaName(facets);`
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
Object value = sessionCache.getIfPresent(pojo);
Object mergedValue = null;
for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination);
if (value == null) {
sessionCache.put(cacheKey, pojo);
} else {
if (mergedValue == null) {
mergedValue = pojo;
} else {
mergedValue = CacheUtil.merge(value, pojo);
}
sessionCache.put(mergedValue, pojo);
}
}
}
}
}
public Metadata getMetadata() {
public Metadata getMetadata() {
return metadata;
}

View file

@ -19,48 +19,50 @@ import java.util.List;
import java.util.Optional;
import com.google.common.base.Stopwatch;
import net.helenus.core.cache.Facet;
public interface UnitOfWork<X extends Exception> extends AutoCloseable {
/**
* Marks the beginning of a transactional section of work. Will write a record
* to the shared write-ahead log.
*
* @return the handle used to commit or abort the work.
*/
UnitOfWork<X> begin();
/**
* Marks the beginning of a transactional section of work. Will write a record
* to the shared write-ahead log.
*
* @return the handle used to commit or abort the work.
*/
UnitOfWork<X> begin();
void addNestedUnitOfWork(UnitOfWork<X> uow);
void addNestedUnitOfWork(UnitOfWork<X> uow);
/**
* Checks to see if the work performed between calling begin and now can be
* committed or not.
*
* @return a function from which to chain work that only happens when commit is
* successful
* @throws X when the work overlaps with other concurrent writers.
*/
PostCommitFunction<Void, Void> commit() throws X;
/**
* Checks to see if the work performed between calling begin and now can be
* committed or not.
*
* @return a function from which to chain work that only happens when commit is
* successful
* @throws X
* when the work overlaps with other concurrent writers.
*/
PostCommitFunction<Void, Void> commit() throws X;
/**
* Explicitly abort the work within this unit of work. Any nested aborted unit
* of work will trigger the entire unit of work to commit.
*/
void abort();
/**
* Explicitly abort the work within this unit of work. Any nested aborted unit
* of work will trigger the entire unit of work to commit.
*/
void abort();
boolean hasAborted();
boolean hasAborted();
boolean hasCommitted();
boolean hasCommitted();
Optional<Object> cacheLookup(List<Facet> facets);
Optional<Object> cacheLookup(List<Facet> facets);
void cacheUpdate(Object pojo, List<Facet> facets);
void cacheUpdate(Object pojo, List<Facet> facets);
UnitOfWork setPurpose(String purpose);
UnitOfWork setPurpose(String purpose);
Stopwatch getExecutionTimer();
Stopwatch getExecutionTimer();
Stopwatch getCacheLookupTimer();
Stopwatch getCacheLookupTimer();
}

View file

@ -1,52 +1,49 @@
package net.helenus.core.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class CacheUtil {
public static List<String[]> combinations(List<String> items) {
int n = items.size();
if (n > 20 || n < 0) throw new IllegalArgumentException(n + " is out of range");
long e = Math.round(Math.pow(2, n));
List<String[]> out = new ArrayList<String[]>((int) e - 1);
for (int k = 1; k <= items.size(); k++) {
kCombinations(items, 0, k, new String[k], out);
}
return out;
}
public static List<String[]> combinations(List<String> items) {
int n = items.size();
if (n > 20 || n < 0)
throw new IllegalArgumentException(n + " is out of range");
long e = Math.round(Math.pow(2, n));
List<String[]> out = new ArrayList<String[]>((int) e - 1);
for (int k = 1; k <= items.size(); k++) {
kCombinations(items, 0, k, new String[k], out);
}
return out;
}
private static void kCombinations(List<String> items, int n, int k, String[] arr, List<String[]> out) {
if (k == 0) {
out.add(arr.clone());
} else {
for (int i = n; i <= items.size() - k; i++) {
arr[arr.length - k] = items.get(i);
kCombinations(items, i + 1, k - 1, arr, out);
}
}
}
private static void kCombinations(List<String> items, int n, int k, String[] arr, List<String[]> out) {
if (k == 0) {
out.add(arr.clone());
} else {
for (int i = n; i <= items.size() - k; i++) {
arr[arr.length - k] = items.get(i);
kCombinations(items, i + 1, k - 1, arr, out);
}
}
}
public static List<String[]> flattenFacets(List<Facet> facets) {
List<String[]> combinations = CacheUtil.combinations(facets.stream()
.filter(facet -> !facet.fixed())
.filter(facet -> facet.value() != null)
.map(facet -> {
return facet.name() + "==" + facet.value();
}).collect(Collectors.toList()));
return combinations;
}
public static List<String[]> flattenFacets(List<Facet> facets) {
List<String[]> combinations = CacheUtil.combinations(
facets.stream().filter(facet -> !facet.fixed()).filter(facet -> facet.value() != null).map(facet -> {
return facet.name() + "==" + facet.value();
}).collect(Collectors.toList()));
return combinations;
}
public static Object merge(Object to, Object from) {
return to; // TODO(gburd): yeah...
}
public static Object merge(Object to, Object from) {
return to; // TODO(gburd): yeah...
}
public static String schemaName(List<Facet> facets) {
return facets.stream().filter(Facet::fixed)
.map(facet -> facet.value().toString())
.collect(Collectors.joining("."));
}
public static String schemaName(List<Facet> facets) {
return facets.stream().filter(Facet::fixed).map(facet -> facet.value().toString())
.collect(Collectors.joining("."));
}
}

View file

@ -41,8 +41,13 @@ public class Facet<T> {
return value;
}
public Facet setFixed() { fixed = true; return this; }
public Facet setFixed() {
fixed = true;
return this;
}
public boolean fixed() { return fixed; }
public boolean fixed() {
return fixed;
}
}

View file

@ -16,8 +16,6 @@
package net.helenus.core.operation;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
@ -41,7 +39,7 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
return new PreparedOperation<E>(prepareStatement(), this);
}
public E sync() {//throws TimeoutException {
public E sync() {// throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits,
@ -52,7 +50,7 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
}
}
public E sync(UnitOfWork uow) {//throws TimeoutException {
public E sync(UnitOfWork uow) {// throws TimeoutException {
if (uow == null)
return sync();
@ -69,11 +67,11 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
public CompletableFuture<E> async() {
return CompletableFuture.<E>supplyAsync(() -> {
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
});
}
@ -81,11 +79,11 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
if (uow == null)
return async();
return CompletableFuture.<E>supplyAsync(() -> {
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
});
}
}

View file

@ -18,9 +18,6 @@ package net.helenus.core.operation;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.PreparedStatement;
@ -30,13 +27,6 @@ import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.CacheUtil;
@ -67,88 +57,88 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
});
}
public Optional<E> sync() {//throws TimeoutException {
public Optional<E> sync() {// throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
Optional<E> result = Optional.empty();
E cacheResult = null;
boolean updateCache = isSessionCacheable();
Optional<E> result = Optional.empty();
E cacheResult = null;
boolean updateCache = isSessionCacheable();
if (enableCache && isSessionCacheable()) {
List<Facet> facets = bindFacetValues();
String tableName = CacheUtil.schemaName(facets);
cacheResult = (E)sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
result = Optional.of(cacheResult);
updateCache = false;
}
}
if (enableCache && isSessionCacheable()) {
List<Facet> facets = bindFacetValues();
String tableName = CacheUtil.schemaName(facets);
cacheResult = (E) sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
result = Optional.of(cacheResult);
updateCache = false;
}
}
if (!result.isPresent()) {
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout,
queryTimeoutUnits,
showValues, false);
if (!result.isPresent()) {
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout,
queryTimeoutUnits, showValues, false);
// Transform the query result set into the desired shape.
result = transform(resultSet);
}
// Transform the query result set into the desired shape.
result = transform(resultSet);
}
if (updateCache && result.isPresent()) {
List<Facet> facets = getFacets();
if (facets != null && facets.size() > 1) {
sessionOps.updateCache(result.get(), facets);
}
}
return result;
} finally {
if (updateCache && result.isPresent()) {
List<Facet> facets = getFacets();
if (facets != null && facets.size() > 1) {
sessionOps.updateCache(result.get(), facets);
}
}
return result;
} finally {
context.stop();
}
}
public Optional<E> sync(UnitOfWork<?> uow) {//throws TimeoutException {
public Optional<E> sync(UnitOfWork<?> uow) {// throws TimeoutException {
if (uow == null)
return sync();
final Timer.Context context = requestLatency.time();
try {
Optional<E> result = Optional.empty();
E cacheResult = null;
boolean updateCache = true;
Optional<E> result = Optional.empty();
E cacheResult = null;
boolean updateCache = true;
if (enableCache) {
Stopwatch timer = uow.getCacheLookupTimer();
timer.start();
List<Facet> facets = bindFacetValues();
Stopwatch timer = uow.getCacheLookupTimer();
timer.start();
List<Facet> facets = bindFacetValues();
cacheResult = checkCache(uow, facets);
if (cacheResult != null) {
result = Optional.of(cacheResult);
updateCache = false;
} else {
if (isSessionCacheable()) {
String tableName = CacheUtil.schemaName(facets);
cacheResult = (E) sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
result = Optional.of(cacheResult);
}
}
}
timer.stop();
} else {
if (isSessionCacheable()) {
String tableName = CacheUtil.schemaName(facets);
cacheResult = (E) sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
result = Optional.of(cacheResult);
}
}
}
timer.stop();
}
if (!result.isPresent()) {
// Formulate the query and execute it against the Cassandra cluster.
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true);
// Transform the query result set into the desired shape.
// Transform the query result set into the desired shape.
result = transform(resultSet);
}
// If we have a result, it wasn't from the UOW cache, and we're caching things then we
// If we have a result, it wasn't from the UOW cache, and we're caching things
// then we
// need to put this result into the cache for future requests to find.
if (updateCache && result.isPresent()) {
if (updateCache && result.isPresent()) {
updateCache(uow, result.get(), getFacets());
}
@ -160,11 +150,11 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
public CompletableFuture<Optional<E>> async() {
return CompletableFuture.<Optional<E>>supplyAsync(() -> {
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
});
}
@ -172,11 +162,11 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
if (uow == null)
return async();
return CompletableFuture.<Optional<E>>supplyAsync(() -> {
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
});
}
}

View file

@ -47,7 +47,7 @@ import net.helenus.support.HelenusException;
public abstract class AbstractStatementOperation<E, O extends AbstractStatementOperation<E, O>> extends Operation<E> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractStatementOperation.class);
private static final Logger LOG = LoggerFactory.getLogger(AbstractStatementOperation.class);
protected boolean enableCache = true;
protected boolean showValues = true;
@ -342,19 +342,19 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
protected void updateCache(UnitOfWork<?> uow, E pojo, List<Facet> identifyingFacets) {
List<Facet> facets = new ArrayList<>();
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
Map<String, Object> valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
for (Facet facet : identifyingFacets) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
binder.setValueForProperty(prop, value.toString());
} else {
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
}
if (valueMap == null) {
Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false);
binder.setValueForProperty(prop, value.toString());
} else {
binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString());
}
facets.add(binder.bind());
});
} else {

View file

@ -17,10 +17,7 @@ package net.helenus.core.operation;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import com.codahale.metrics.Timer;
@ -61,52 +58,51 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
});
}
public Stream<E> sync() {//throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
Stream<E> resultStream = null;
E cacheResult = null;
boolean updateCache = isSessionCacheable();
public Stream<E> sync() {// throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
Stream<E> resultStream = null;
E cacheResult = null;
boolean updateCache = isSessionCacheable();
if (enableCache && isSessionCacheable()) {
List<Facet> facets = bindFacetValues();
String tableName = CacheUtil.schemaName(facets);
cacheResult = (E) sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
resultStream = Stream.of(cacheResult);
updateCache = false;
}
}
if (enableCache && isSessionCacheable()) {
List<Facet> facets = bindFacetValues();
String tableName = CacheUtil.schemaName(facets);
cacheResult = (E) sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
resultStream = Stream.of(cacheResult);
updateCache = false;
}
}
if (resultStream == null) {
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout,
queryTimeoutUnits,
showValues, false);
if (resultStream == null) {
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout,
queryTimeoutUnits, showValues, false);
// Transform the query result set into the desired shape.
resultStream = transform(resultSet);
}
// Transform the query result set into the desired shape.
resultStream = transform(resultSet);
}
if (updateCache && resultStream != null) {
List<Facet> facets = getFacets();
if (facets != null && facets.size() > 1) {
List<E> again = new ArrayList<>();
resultStream.forEach(result -> {
sessionOps.updateCache(result, facets);
again.add(result);
});
resultStream = again.stream();
}
}
return resultStream;
if (updateCache && resultStream != null) {
List<Facet> facets = getFacets();
if (facets != null && facets.size() > 1) {
List<E> again = new ArrayList<>();
resultStream.forEach(result -> {
sessionOps.updateCache(result, facets);
again.add(result);
});
resultStream = again.stream();
}
}
return resultStream;
} finally {
context.stop();
}
}
public Stream<E> sync(UnitOfWork<?> uow) {//throws TimeoutException {
public Stream<E> sync(UnitOfWork<?> uow) {// throws TimeoutException {
if (uow == null)
return sync();
@ -117,8 +113,8 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
boolean updateCache = true;
if (enableCache) {
Stopwatch timer = uow.getCacheLookupTimer();
timer.start();
Stopwatch timer = uow.getCacheLookupTimer();
timer.start();
List<Facet> facets = bindFacetValues();
cachedResult = checkCache(uow, facets);
if (cachedResult != null) {
@ -137,13 +133,13 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
// If we have a result and we're caching then we need to put it into the cache
// for future requests to find.
if (updateCache && resultStream != null) {
List<E> again = new ArrayList<>();
List<Facet> facets = getFacets();
resultStream.forEach(result -> {
updateCache(uow, result, facets);
again.add(result);
});
resultStream = again.stream();
List<E> again = new ArrayList<>();
List<Facet> facets = getFacets();
resultStream.forEach(result -> {
updateCache(uow, result, facets);
again.add(result);
});
resultStream = again.stream();
}
return resultStream;
@ -154,11 +150,11 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public CompletableFuture<Stream<E>> async() {
return CompletableFuture.<Stream<E>>supplyAsync(() -> {
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
});
}
@ -166,11 +162,11 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
if (uow == null)
return async();
return CompletableFuture.<Stream<E>>supplyAsync(() -> {
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
});
}
}

View file

@ -40,6 +40,8 @@ public final class BoundOperation<E> extends AbstractOperation<E, BoundOperation
return boundStatement;
}
@Override
public boolean isSessionCacheable() { return delegate.isSessionCacheable(); }
@Override
public boolean isSessionCacheable() {
return delegate.isSessionCacheable();
}
}

View file

@ -42,6 +42,8 @@ public final class BoundOptionalOperation<E> extends AbstractOptionalOperation<E
return boundStatement;
}
@Override
public boolean isSessionCacheable() { return delegate.isSessionCacheable(); }
@Override
public boolean isSessionCacheable() {
return delegate.isSessionCacheable();
}
}

View file

@ -50,6 +50,8 @@ public final class BoundStreamOperation<E> extends AbstractStreamOperation<E, Bo
return boundStatement;
}
@Override
public boolean isSessionCacheable() { return delegate.isSessionCacheable(); }
@Override
public boolean isSessionCacheable() {
return delegate.isSessionCacheable();
}
}

View file

@ -16,7 +16,6 @@
package net.helenus.core.operation;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import com.datastax.driver.core.ResultSet;
@ -237,7 +236,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
}
@Override
public T sync(UnitOfWork uow) {//throws TimeoutException {
public T sync(UnitOfWork uow) {// throws TimeoutException {
if (uow == null) {
return sync();
}

View file

@ -17,7 +17,6 @@ package net.helenus.core.operation;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
@ -25,11 +24,11 @@ import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.base.Stopwatch;
import brave.Span;
import brave.Tracer;
import brave.propagation.TraceContext;
import com.google.common.base.Stopwatch;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.Facet;
@ -50,7 +49,7 @@ public abstract class Operation<E> {
}
public ResultSet execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, long timeout,
TimeUnit units, boolean showValues, boolean cached) { //throws TimeoutException {
TimeUnit units, boolean showValues, boolean cached) { // throws TimeoutException {
// Start recording in a Zipkin sub-span our execution time to perform this
// operation.
@ -68,17 +67,18 @@ public abstract class Operation<E> {
}
Statement statement = options(buildStatement(cached));
Stopwatch timer = null;
Stopwatch timer = null;
if (uow != null) {
timer = uow.getExecutionTimer();
timer.start();
}
timer = uow.getExecutionTimer();
timer.start();
}
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
ResultSet resultSet = futureResultSet.getUninterruptibly(); //TODO(gburd): (timeout, units);
ResultSet resultSet = futureResultSet.getUninterruptibly(); // TODO(gburd): (timeout, units);
if (uow != null) timer.stop();
if (uow != null)
timer.stop();
return resultSet;
return resultSet;
} finally {
@ -104,6 +104,8 @@ public abstract class Operation<E> {
return null;
}
public boolean isSessionCacheable() { return false; }
public boolean isSessionCacheable() {
return false;
}
}

View file

@ -60,6 +60,8 @@ public final class SelectFirstOperation<E> extends AbstractFilterOptionalOperati
return delegate.transform(resultSet).findFirst();
}
@Override
public boolean isSessionCacheable() { return delegate.isSessionCacheable(); }
@Override
public boolean isSessionCacheable() {
return delegate.isSessionCacheable();
}
}

View file

@ -55,6 +55,8 @@ public final class SelectFirstTransformingOperation<R, E>
return delegate.transform(resultSet).findFirst().map(fn);
}
@Override
public boolean isSessionCacheable() { return delegate.isSessionCacheable(); }
@Override
public boolean isSessionCacheable() {
return delegate.isSessionCacheable();
}
}

View file

@ -20,6 +20,9 @@ import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.querybuilder.BuiltStatement;
@ -41,12 +44,10 @@ import net.helenus.mapping.value.ColumnValueProvider;
import net.helenus.mapping.value.ValueProviderMap;
import net.helenus.support.Fun;
import net.helenus.support.HelenusMappingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, SelectOperation<E>> {
private static final Logger LOG = LoggerFactory.getLogger(SelectOperation.class);
private static final Logger LOG = LoggerFactory.getLogger(SelectOperation.class);
protected final List<HelenusPropertyNode> props = new ArrayList<HelenusPropertyNode>();
protected Function<Row, E> rowMapper = null;
@ -86,7 +87,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
entity.getOrderedProperties().stream().map(p -> new HelenusPropertyNode(p, Optional.empty()))
.forEach(p -> this.props.add(p));
isCacheable = entity.isCacheable();
isCacheable = entity.isCacheable();
}
public SelectOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity,
@ -98,7 +99,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
entity.getOrderedProperties().stream().map(p -> new HelenusPropertyNode(p, Optional.empty()))
.forEach(p -> this.props.add(p));
isCacheable = entity.isCacheable();
isCacheable = entity.isCacheable();
}
public SelectOperation(AbstractSessionOperations sessionOperations, Function<Row, E> rowMapper,
@ -185,8 +186,10 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
return this;
}
@Override
public boolean isSessionCacheable() { return isCacheable; }
@Override
public boolean isSessionCacheable() {
return isCacheable;
}
@Override
public List<Facet> getFacets() {
@ -200,10 +203,10 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet.getProperties().forEach(prop -> {
Filter filter = filters.get(prop);
if (filter != null) {
Object[] postulates = filter.postulateValues();

View file

@ -16,7 +16,6 @@
package net.helenus.core.operation;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import com.datastax.driver.core.ResultSet;
@ -570,12 +569,13 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
}
@Override
public E sync(UnitOfWork uow) {//throws TimeoutException {
public E sync(UnitOfWork uow) {// throws TimeoutException {
if (uow == null) {
return sync();
}
E result = super.sync(uow);
// TODO(gburd): Only drafted entity objects are updated in the cache at this time.
// TODO(gburd): Only drafted entity objects are updated in the cache at this
// time.
if (draft != null) {
updateCache(uow, result, getFacets());
}