diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index d7b666e..15abfae 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -19,25 +19,24 @@ import java.io.PrintStream; import java.util.List; import java.util.concurrent.Executor; -import com.google.common.cache.Cache; -import com.google.common.collect.Table; -import net.helenus.core.cache.Facet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.codahale.metrics.MetricRegistry; import com.datastax.driver.core.*; import com.datastax.driver.core.querybuilder.BuiltStatement; +import com.google.common.collect.Table; import com.google.common.util.concurrent.ListenableFuture; import brave.Tracer; +import net.helenus.core.cache.Facet; import net.helenus.mapping.value.ColumnValuePreparer; import net.helenus.mapping.value.ColumnValueProvider; import net.helenus.support.HelenusException; public abstract class AbstractSessionOperations { - private static final Logger LOG = LoggerFactory.getLogger(AbstractSessionOperations.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractSessionOperations.class); public abstract Session currentSession(); @@ -120,7 +119,8 @@ public abstract class AbstractSessionOperations { return null; } - public void mergeCache(Table cache) {} + public void mergeCache(Table cache) { + } RuntimeException translateException(RuntimeException e) { if (e instanceof HelenusException) { @@ -129,9 +129,12 @@ public abstract class AbstractSessionOperations { throw new HelenusException(e); } - public Object checkCache(String tableName, List facets) { return null; } + public Object checkCache(String tableName, List facets) { + return null; + } - public void updateCache(Object pojo, List facets) { } + public void updateCache(Object pojo, List facets) { + } void printCql(String cql) { getPrintStream().println(cql); diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java index abe1e8a..fe3ccdf 100644 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -18,6 +18,9 @@ package net.helenus.core; import java.util.*; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.diffplug.common.base.Errors; import com.google.common.base.Stopwatch; import com.google.common.collect.HashBasedTable; @@ -26,30 +29,25 @@ import com.google.common.collect.TreeTraverser; import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Encapsulates the concept of a "transaction" as a unit-of-work. */ public abstract class AbstractUnitOfWork implements UnitOfWork, AutoCloseable { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractUnitOfWork.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractUnitOfWork.class); private final List> nested = new ArrayList<>(); private final HelenusSession session; private final AbstractUnitOfWork parent; + // Cache: + private final Table cache = HashBasedTable.create(); private List postCommit = new ArrayList(); private boolean aborted = false; private boolean committed = false; - private String purpose_; private Stopwatch elapsedTime_; private Stopwatch databaseTime_ = Stopwatch.createUnstarted(); private Stopwatch cacheLookupTime_ = Stopwatch.createUnstarted(); - // Cache: - private final Table cache = HashBasedTable.create(); - protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork parent) { Objects.requireNonNull(session, "containing session cannot be null"); @@ -57,17 +55,17 @@ public abstract class AbstractUnitOfWork implements UnitOfW this.parent = parent; } - @Override - public Stopwatch getExecutionTimer() { - return databaseTime_; - } + @Override + public Stopwatch getExecutionTimer() { + return databaseTime_; + } - @Override - public Stopwatch getCacheLookupTimer() { - return cacheLookupTime_; - } + @Override + public Stopwatch getCacheLookupTimer() { + return cacheLookupTime_; + } - @Override + @Override public void addNestedUnitOfWork(UnitOfWork uow) { synchronized (nested) { nested.add((AbstractUnitOfWork) uow); @@ -82,54 +80,54 @@ public abstract class AbstractUnitOfWork implements UnitOfW } @Override - public UnitOfWork setPurpose(String purpose) { - purpose_ = purpose; - return this; - } + public UnitOfWork setPurpose(String purpose) { + purpose_ = purpose; + return this; + } - public void logTimers(String what) { - double e = (double)elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0; - double d = (double)databaseTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0; - double c = (double)cacheLookupTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0; - double fd = (d / (e - c)) * 100.0; - double fc = (c / (e - d)) * 100.0; - LOG.info(String.format("UOW(%s)%s %s (total: %.3fms cache: %.3fms %2.2f%% db: %.3fms %2.2f%%)", - hashCode(), (purpose_ == null ? "" : " " + purpose_), what, e, c, fc, d, fd)); - } + public void logTimers(String what) { + double e = (double) elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0; + double d = (double) databaseTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0; + double c = (double) cacheLookupTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0; + double fd = (d / (e - c)) * 100.0; + double fc = (c / (e - d)) * 100.0; + LOG.info(String.format("UOW(%s)%s %s (total: %.3fms cache: %.3fms %2.2f%% db: %.3fms %2.2f%%)", hashCode(), + (purpose_ == null ? "" : " " + purpose_), what, e, c, fc, d, fd)); + } - private void applyPostCommitFunctions() { + private void applyPostCommitFunctions() { if (!postCommit.isEmpty()) { for (CommitThunk f : postCommit) { f.apply(); } } - logTimers("committed"); + logTimers("committed"); } @Override public Optional cacheLookup(List facets) { - String tableName = CacheUtil.schemaName(facets); + String tableName = CacheUtil.schemaName(facets); Optional result = Optional.empty(); for (Facet facet : facets) { - if (!facet.fixed()) { - String columnName = facet.name() + "==" + facet.value(); - Object value = cache.get(tableName, columnName); - if (value != null) { - if (result.isPresent() && result.get() != value) { - // One facet matched, but another did not. - result = Optional.empty(); - break; - } else { - result = Optional.of(value); - } - } - } + if (!facet.fixed()) { + String columnName = facet.name() + "==" + facet.value(); + Object value = cache.get(tableName, columnName); + if (value != null) { + if (result.isPresent() && result.get() != value) { + // One facet matched, but another did not. + result = Optional.empty(); + break; + } else { + result = Optional.of(value); + } + } + } } if (!result.isPresent()) { // Be sure to check all enclosing UnitOfWork caches as well, we may be nested. if (parent != null) { return parent.cacheLookup(facets); - } + } } return result; } @@ -185,9 +183,9 @@ public abstract class AbstractUnitOfWork implements UnitOfW if (parent != null) { parent.mergeCache(cache); } else { - session.mergeCache(cache); - } - elapsedTime_.stop(); + session.mergeCache(cache); + } + elapsedTime_.stop(); // Apply all post-commit functions for if (parent == null) { @@ -213,23 +211,23 @@ public abstract class AbstractUnitOfWork implements UnitOfW }); // log.record(txn::abort) // cache.invalidateSince(txn::start time) - if (!hasAborted()) { - elapsedTime_.stop(); - logTimers("aborted"); - } + if (!hasAborted()) { + elapsedTime_.stop(); + logTimers("aborted"); + } } private void mergeCache(Table from) { - Table to = this.cache; - from.rowMap().forEach((rowKey, columnMap) -> { - columnMap.forEach((columnKey, value) -> { - if (to.contains(rowKey, columnKey)) { - to.put(rowKey, columnKey, CacheUtil.merge(to.get(rowKey, columnKey), from.get(rowKey, columnKey))); - } else { - to.put(rowKey, columnKey, from.get(rowKey, columnKey)); - } - }); - }); + Table to = this.cache; + from.rowMap().forEach((rowKey, columnMap) -> { + columnMap.forEach((columnKey, value) -> { + if (to.contains(rowKey, columnKey)) { + to.put(rowKey, columnKey, CacheUtil.merge(to.get(rowKey, columnKey), from.get(rowKey, columnKey))); + } else { + to.put(rowKey, columnKey, from.get(rowKey, columnKey)); + } + }); + }); } public String describeConflicts() { diff --git a/src/main/java/net/helenus/core/Helenus.java b/src/main/java/net/helenus/core/Helenus.java index 686b8dc..825ff1e 100644 --- a/src/main/java/net/helenus/core/Helenus.java +++ b/src/main/java/net/helenus/core/Helenus.java @@ -185,8 +185,8 @@ public final class Helenus { } if (metadata != null) { - metadataForEntity.putIfAbsent(iface, metadata); - } + metadataForEntity.putIfAbsent(iface, metadata); + } return entity(iface, metadata); } diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index d9b6eff..eb34514 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -29,11 +29,11 @@ import java.util.stream.Collectors; import com.codahale.metrics.MetricRegistry; import com.datastax.driver.core.*; - -import brave.Tracer; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Table; + +import brave.Tracer; import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; import net.helenus.core.cache.UnboundFacet; @@ -95,7 +95,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C this.metricRegistry = metricRegistry; this.zipkinTracer = tracer; - this.sessionCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE) + this.sessionCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE) .expireAfterAccess(MAX_CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).recordStats().build(); this.valueProvider = new RowColumnValueProvider(this.sessionRepository); @@ -180,113 +180,112 @@ public final class HelenusSession extends AbstractSessionOperations implements C } @Override - public Object checkCache(String tableName, List facets) { - List facetCombinations = CacheUtil.flattenFacets(facets); - Object result = null; - for (String[] combination : facetCombinations) { - String cacheKey = tableName + "." + Arrays.toString(combination); - result = sessionCache.getIfPresent(cacheKey); - if (result != null) { - return result; - } - } - return null; + public Object checkCache(String tableName, List facets) { + List facetCombinations = CacheUtil.flattenFacets(facets); + Object result = null; + for (String[] combination : facetCombinations) { + String cacheKey = tableName + "." + Arrays.toString(combination); + result = sessionCache.getIfPresent(cacheKey); + if (result != null) { + return result; + } + } + return null; } @Override - public void updateCache(Object pojo, List facets) { - Map valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; - List boundFacets = new ArrayList<>(); - for (Facet facet : facets) { - if (facet instanceof UnboundFacet) { - UnboundFacet unboundFacet = (UnboundFacet) facet; - UnboundFacet.Binder binder = unboundFacet.binder(); - unboundFacet.getProperties().forEach(prop -> { - if (valueMap == null) { - Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false); - binder.setValueForProperty(prop, value.toString()); - } else { - binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString()); - } - }); - if (binder.isBound()) { - boundFacets.add(binder.bind()); - } - } else { - boundFacets.add(facet); - } - } - String tableName = CacheUtil.schemaName(facets); - List facetCombinations = CacheUtil.flattenFacets(boundFacets); - Object value = sessionCache.getIfPresent(pojo); - Object mergedValue = null; - for (String[] combination : facetCombinations) { - String cacheKey = tableName + "." + Arrays.toString(combination); - if (value == null) { - sessionCache.put(cacheKey, pojo); - } else { - if (mergedValue == null) { - mergedValue = pojo; - } else { - mergedValue = CacheUtil.merge(value, pojo); - } - sessionCache.put(mergedValue, pojo); - } - } + public void updateCache(Object pojo, List facets) { + Map valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; + List boundFacets = new ArrayList<>(); + for (Facet facet : facets) { + if (facet instanceof UnboundFacet) { + UnboundFacet unboundFacet = (UnboundFacet) facet; + UnboundFacet.Binder binder = unboundFacet.binder(); + unboundFacet.getProperties().forEach(prop -> { + if (valueMap == null) { + Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false); + binder.setValueForProperty(prop, value.toString()); + } else { + binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString()); + } + }); + if (binder.isBound()) { + boundFacets.add(binder.bind()); + } + } else { + boundFacets.add(facet); + } + } + String tableName = CacheUtil.schemaName(facets); + List facetCombinations = CacheUtil.flattenFacets(boundFacets); + Object value = sessionCache.getIfPresent(pojo); + Object mergedValue = null; + for (String[] combination : facetCombinations) { + String cacheKey = tableName + "." + Arrays.toString(combination); + if (value == null) { + sessionCache.put(cacheKey, pojo); + } else { + if (mergedValue == null) { + mergedValue = pojo; + } else { + mergedValue = CacheUtil.merge(value, pojo); + } + sessionCache.put(mergedValue, pojo); + } + } - } + } - @Override - public void mergeCache(Table uowCache) { - List pojos = uowCache.values().stream().distinct() - .collect(Collectors.toList()); - for (Object pojo : pojos) { - HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo)); - Map valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; - if (entity.isCacheable()) { - List boundFacets = new ArrayList<>(); - for (Facet facet : entity.getFacets()) { - if (facet instanceof UnboundFacet) { - UnboundFacet unboundFacet = (UnboundFacet) facet; - UnboundFacet.Binder binder = unboundFacet.binder(); - unboundFacet.getProperties().forEach(prop -> { - if (valueMap == null) { - Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false); - binder.setValueForProperty(prop, value.toString()); - } else { - binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString()); - } - }); - if (binder.isBound()) { - boundFacets.add(binder.bind()); - } - } else { - boundFacets.add(facet); - } - } - String tableName = entity.getName().toCql(); - // NOTE: should equal `String tableName = CacheUtil.schemaName(facets);` - List facetCombinations = CacheUtil.flattenFacets(boundFacets); - Object value = sessionCache.getIfPresent(pojo); - Object mergedValue = null; - for (String[] combination : facetCombinations) { - String cacheKey = tableName + "." + Arrays.toString(combination); - if (value == null) { - sessionCache.put(cacheKey, pojo); - } else { - if (mergedValue == null) { - mergedValue = pojo; - } else { - mergedValue = CacheUtil.merge(value, pojo); - } - sessionCache.put(mergedValue, pojo); - } - } - } - } - } + @Override + public void mergeCache(Table uowCache) { + List pojos = uowCache.values().stream().distinct().collect(Collectors.toList()); + for (Object pojo : pojos) { + HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo)); + Map valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; + if (entity.isCacheable()) { + List boundFacets = new ArrayList<>(); + for (Facet facet : entity.getFacets()) { + if (facet instanceof UnboundFacet) { + UnboundFacet unboundFacet = (UnboundFacet) facet; + UnboundFacet.Binder binder = unboundFacet.binder(); + unboundFacet.getProperties().forEach(prop -> { + if (valueMap == null) { + Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false); + binder.setValueForProperty(prop, value.toString()); + } else { + binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString()); + } + }); + if (binder.isBound()) { + boundFacets.add(binder.bind()); + } + } else { + boundFacets.add(facet); + } + } + String tableName = entity.getName().toCql(); + // NOTE: should equal `String tableName = CacheUtil.schemaName(facets);` + List facetCombinations = CacheUtil.flattenFacets(boundFacets); + Object value = sessionCache.getIfPresent(pojo); + Object mergedValue = null; + for (String[] combination : facetCombinations) { + String cacheKey = tableName + "." + Arrays.toString(combination); + if (value == null) { + sessionCache.put(cacheKey, pojo); + } else { + if (mergedValue == null) { + mergedValue = pojo; + } else { + mergedValue = CacheUtil.merge(value, pojo); + } + sessionCache.put(mergedValue, pojo); + } + } + } + } + } - public Metadata getMetadata() { + public Metadata getMetadata() { return metadata; } diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index c0d8817..6ff3496 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -19,48 +19,50 @@ import java.util.List; import java.util.Optional; import com.google.common.base.Stopwatch; + import net.helenus.core.cache.Facet; public interface UnitOfWork extends AutoCloseable { - /** - * Marks the beginning of a transactional section of work. Will write a record - * to the shared write-ahead log. - * - * @return the handle used to commit or abort the work. - */ - UnitOfWork begin(); + /** + * Marks the beginning of a transactional section of work. Will write a record + * to the shared write-ahead log. + * + * @return the handle used to commit or abort the work. + */ + UnitOfWork begin(); - void addNestedUnitOfWork(UnitOfWork uow); + void addNestedUnitOfWork(UnitOfWork uow); - /** - * Checks to see if the work performed between calling begin and now can be - * committed or not. - * - * @return a function from which to chain work that only happens when commit is - * successful - * @throws X when the work overlaps with other concurrent writers. - */ - PostCommitFunction commit() throws X; + /** + * Checks to see if the work performed between calling begin and now can be + * committed or not. + * + * @return a function from which to chain work that only happens when commit is + * successful + * @throws X + * when the work overlaps with other concurrent writers. + */ + PostCommitFunction commit() throws X; - /** - * Explicitly abort the work within this unit of work. Any nested aborted unit - * of work will trigger the entire unit of work to commit. - */ - void abort(); + /** + * Explicitly abort the work within this unit of work. Any nested aborted unit + * of work will trigger the entire unit of work to commit. + */ + void abort(); - boolean hasAborted(); + boolean hasAborted(); - boolean hasCommitted(); + boolean hasCommitted(); - Optional cacheLookup(List facets); + Optional cacheLookup(List facets); - void cacheUpdate(Object pojo, List facets); + void cacheUpdate(Object pojo, List facets); - UnitOfWork setPurpose(String purpose); + UnitOfWork setPurpose(String purpose); - Stopwatch getExecutionTimer(); + Stopwatch getExecutionTimer(); - Stopwatch getCacheLookupTimer(); + Stopwatch getCacheLookupTimer(); } diff --git a/src/main/java/net/helenus/core/cache/CacheUtil.java b/src/main/java/net/helenus/core/cache/CacheUtil.java index de5a1e7..37dbc30 100644 --- a/src/main/java/net/helenus/core/cache/CacheUtil.java +++ b/src/main/java/net/helenus/core/cache/CacheUtil.java @@ -1,52 +1,49 @@ package net.helenus.core.cache; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; public class CacheUtil { - public static List combinations(List items) { - int n = items.size(); - if (n > 20 || n < 0) throw new IllegalArgumentException(n + " is out of range"); - long e = Math.round(Math.pow(2, n)); - List out = new ArrayList((int) e - 1); - for (int k = 1; k <= items.size(); k++) { - kCombinations(items, 0, k, new String[k], out); - } - return out; - } + public static List combinations(List items) { + int n = items.size(); + if (n > 20 || n < 0) + throw new IllegalArgumentException(n + " is out of range"); + long e = Math.round(Math.pow(2, n)); + List out = new ArrayList((int) e - 1); + for (int k = 1; k <= items.size(); k++) { + kCombinations(items, 0, k, new String[k], out); + } + return out; + } - private static void kCombinations(List items, int n, int k, String[] arr, List out) { - if (k == 0) { - out.add(arr.clone()); - } else { - for (int i = n; i <= items.size() - k; i++) { - arr[arr.length - k] = items.get(i); - kCombinations(items, i + 1, k - 1, arr, out); - } - } - } + private static void kCombinations(List items, int n, int k, String[] arr, List out) { + if (k == 0) { + out.add(arr.clone()); + } else { + for (int i = n; i <= items.size() - k; i++) { + arr[arr.length - k] = items.get(i); + kCombinations(items, i + 1, k - 1, arr, out); + } + } + } - public static List flattenFacets(List facets) { - List combinations = CacheUtil.combinations(facets.stream() - .filter(facet -> !facet.fixed()) - .filter(facet -> facet.value() != null) - .map(facet -> { - return facet.name() + "==" + facet.value(); - }).collect(Collectors.toList())); - return combinations; - } + public static List flattenFacets(List facets) { + List combinations = CacheUtil.combinations( + facets.stream().filter(facet -> !facet.fixed()).filter(facet -> facet.value() != null).map(facet -> { + return facet.name() + "==" + facet.value(); + }).collect(Collectors.toList())); + return combinations; + } - public static Object merge(Object to, Object from) { - return to; // TODO(gburd): yeah... - } + public static Object merge(Object to, Object from) { + return to; // TODO(gburd): yeah... + } - public static String schemaName(List facets) { - return facets.stream().filter(Facet::fixed) - .map(facet -> facet.value().toString()) - .collect(Collectors.joining(".")); - } + public static String schemaName(List facets) { + return facets.stream().filter(Facet::fixed).map(facet -> facet.value().toString()) + .collect(Collectors.joining(".")); + } } diff --git a/src/main/java/net/helenus/core/cache/Facet.java b/src/main/java/net/helenus/core/cache/Facet.java index f063adc..4f74ab5 100644 --- a/src/main/java/net/helenus/core/cache/Facet.java +++ b/src/main/java/net/helenus/core/cache/Facet.java @@ -41,8 +41,13 @@ public class Facet { return value; } - public Facet setFixed() { fixed = true; return this; } + public Facet setFixed() { + fixed = true; + return this; + } - public boolean fixed() { return fixed; } + public boolean fixed() { + return fixed; + } } diff --git a/src/main/java/net/helenus/core/operation/AbstractOperation.java b/src/main/java/net/helenus/core/operation/AbstractOperation.java index 09649dd..4afbdc0 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOperation.java @@ -16,8 +16,6 @@ package net.helenus.core.operation; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeoutException; import com.codahale.metrics.Timer; import com.datastax.driver.core.ResultSet; @@ -41,7 +39,7 @@ public abstract class AbstractOperation> ex return new PreparedOperation(prepareStatement(), this); } - public E sync() {//throws TimeoutException { + public E sync() {// throws TimeoutException { final Timer.Context context = requestLatency.time(); try { ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, @@ -52,7 +50,7 @@ public abstract class AbstractOperation> ex } } - public E sync(UnitOfWork uow) {//throws TimeoutException { + public E sync(UnitOfWork uow) {// throws TimeoutException { if (uow == null) return sync(); @@ -69,11 +67,11 @@ public abstract class AbstractOperation> ex public CompletableFuture async() { return CompletableFuture.supplyAsync(() -> { -// try { - return sync(); -// } catch (TimeoutException ex) { -// throw new CompletionException(ex); -// } + // try { + return sync(); + // } catch (TimeoutException ex) { + // throw new CompletionException(ex); + // } }); } @@ -81,11 +79,11 @@ public abstract class AbstractOperation> ex if (uow == null) return async(); return CompletableFuture.supplyAsync(() -> { -// try { - return sync(); -// } catch (TimeoutException ex) { -// throw new CompletionException(ex); -// } + // try { + return sync(); + // } catch (TimeoutException ex) { + // throw new CompletionException(ex); + // } }); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index 1a450b7..fb37c63 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -18,9 +18,6 @@ package net.helenus.core.operation; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; import com.codahale.metrics.Timer; import com.datastax.driver.core.PreparedStatement; @@ -30,13 +27,6 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.codahale.metrics.Timer; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.google.common.base.Function; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; - import net.helenus.core.AbstractSessionOperations; import net.helenus.core.UnitOfWork; import net.helenus.core.cache.CacheUtil; @@ -67,88 +57,88 @@ public abstract class AbstractOptionalOperation sync() {//throws TimeoutException { + public Optional sync() {// throws TimeoutException { final Timer.Context context = requestLatency.time(); try { - Optional result = Optional.empty(); - E cacheResult = null; - boolean updateCache = isSessionCacheable(); + Optional result = Optional.empty(); + E cacheResult = null; + boolean updateCache = isSessionCacheable(); - if (enableCache && isSessionCacheable()) { - List facets = bindFacetValues(); - String tableName = CacheUtil.schemaName(facets); - cacheResult = (E)sessionOps.checkCache(tableName, facets); - if (cacheResult != null) { - result = Optional.of(cacheResult); - updateCache = false; - } - } + if (enableCache && isSessionCacheable()) { + List facets = bindFacetValues(); + String tableName = CacheUtil.schemaName(facets); + cacheResult = (E) sessionOps.checkCache(tableName, facets); + if (cacheResult != null) { + result = Optional.of(cacheResult); + updateCache = false; + } + } - if (!result.isPresent()) { - // Formulate the query and execute it against the Cassandra cluster. - ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, - queryTimeoutUnits, - showValues, false); + if (!result.isPresent()) { + // Formulate the query and execute it against the Cassandra cluster. + ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, + queryTimeoutUnits, showValues, false); - // Transform the query result set into the desired shape. - result = transform(resultSet); - } + // Transform the query result set into the desired shape. + result = transform(resultSet); + } - if (updateCache && result.isPresent()) { - List facets = getFacets(); - if (facets != null && facets.size() > 1) { - sessionOps.updateCache(result.get(), facets); - } - } - return result; - } finally { + if (updateCache && result.isPresent()) { + List facets = getFacets(); + if (facets != null && facets.size() > 1) { + sessionOps.updateCache(result.get(), facets); + } + } + return result; + } finally { context.stop(); } } - public Optional sync(UnitOfWork uow) {//throws TimeoutException { + public Optional sync(UnitOfWork uow) {// throws TimeoutException { if (uow == null) return sync(); final Timer.Context context = requestLatency.time(); try { - Optional result = Optional.empty(); - E cacheResult = null; - boolean updateCache = true; + Optional result = Optional.empty(); + E cacheResult = null; + boolean updateCache = true; if (enableCache) { - Stopwatch timer = uow.getCacheLookupTimer(); - timer.start(); - List facets = bindFacetValues(); + Stopwatch timer = uow.getCacheLookupTimer(); + timer.start(); + List facets = bindFacetValues(); cacheResult = checkCache(uow, facets); if (cacheResult != null) { result = Optional.of(cacheResult); updateCache = false; - } else { - if (isSessionCacheable()) { - String tableName = CacheUtil.schemaName(facets); - cacheResult = (E) sessionOps.checkCache(tableName, facets); - if (cacheResult != null) { - result = Optional.of(cacheResult); - } - } - } - timer.stop(); + } else { + if (isSessionCacheable()) { + String tableName = CacheUtil.schemaName(facets); + cacheResult = (E) sessionOps.checkCache(tableName, facets); + if (cacheResult != null) { + result = Optional.of(cacheResult); + } + } + } + timer.stop(); } if (!result.isPresent()) { - // Formulate the query and execute it against the Cassandra cluster. + // Formulate the query and execute it against the Cassandra cluster. ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, true); - // Transform the query result set into the desired shape. + // Transform the query result set into the desired shape. result = transform(resultSet); } - // If we have a result, it wasn't from the UOW cache, and we're caching things then we + // If we have a result, it wasn't from the UOW cache, and we're caching things + // then we // need to put this result into the cache for future requests to find. - if (updateCache && result.isPresent()) { + if (updateCache && result.isPresent()) { updateCache(uow, result.get(), getFacets()); } @@ -160,11 +150,11 @@ public abstract class AbstractOptionalOperation> async() { return CompletableFuture.>supplyAsync(() -> { -// try { - return sync(); -// } catch (TimeoutException ex) { -// throw new CompletionException(ex); -// } + // try { + return sync(); + // } catch (TimeoutException ex) { + // throw new CompletionException(ex); + // } }); } @@ -172,11 +162,11 @@ public abstract class AbstractOptionalOperation>supplyAsync(() -> { -// try { - return sync(); -// } catch (TimeoutException ex) { -// throw new CompletionException(ex); -// } + // try { + return sync(); + // } catch (TimeoutException ex) { + // throw new CompletionException(ex); + // } }); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java index 633da38..c46a0f0 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java @@ -47,7 +47,7 @@ import net.helenus.support.HelenusException; public abstract class AbstractStatementOperation> extends Operation { - private static final Logger LOG = LoggerFactory.getLogger(AbstractStatementOperation.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractStatementOperation.class); protected boolean enableCache = true; protected boolean showValues = true; @@ -342,19 +342,19 @@ public abstract class AbstractStatementOperation uow, E pojo, List identifyingFacets) { List facets = new ArrayList<>(); - Map valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; + Map valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; for (Facet facet : identifyingFacets) { if (facet instanceof UnboundFacet) { UnboundFacet unboundFacet = (UnboundFacet) facet; UnboundFacet.Binder binder = unboundFacet.binder(); unboundFacet.getProperties().forEach(prop -> { - if (valueMap == null) { - Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false); - binder.setValueForProperty(prop, value.toString()); - } else { - binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString()); - } + if (valueMap == null) { + Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, false); + binder.setValueForProperty(prop, value.toString()); + } else { + binder.setValueForProperty(prop, valueMap.get(prop.getPropertyName()).toString()); + } facets.add(binder.bind()); }); } else { diff --git a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java index 4a01d56..05201ab 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java @@ -17,10 +17,7 @@ package net.helenus.core.operation; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import com.codahale.metrics.Timer; @@ -61,52 +58,51 @@ public abstract class AbstractStreamOperation sync() {//throws TimeoutException { - final Timer.Context context = requestLatency.time(); - try { - Stream resultStream = null; - E cacheResult = null; - boolean updateCache = isSessionCacheable(); + public Stream sync() {// throws TimeoutException { + final Timer.Context context = requestLatency.time(); + try { + Stream resultStream = null; + E cacheResult = null; + boolean updateCache = isSessionCacheable(); - if (enableCache && isSessionCacheable()) { - List facets = bindFacetValues(); - String tableName = CacheUtil.schemaName(facets); - cacheResult = (E) sessionOps.checkCache(tableName, facets); - if (cacheResult != null) { - resultStream = Stream.of(cacheResult); - updateCache = false; - } - } + if (enableCache && isSessionCacheable()) { + List facets = bindFacetValues(); + String tableName = CacheUtil.schemaName(facets); + cacheResult = (E) sessionOps.checkCache(tableName, facets); + if (cacheResult != null) { + resultStream = Stream.of(cacheResult); + updateCache = false; + } + } - if (resultStream == null) { - // Formulate the query and execute it against the Cassandra cluster. - ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, - queryTimeoutUnits, - showValues, false); + if (resultStream == null) { + // Formulate the query and execute it against the Cassandra cluster. + ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, + queryTimeoutUnits, showValues, false); - // Transform the query result set into the desired shape. - resultStream = transform(resultSet); - } + // Transform the query result set into the desired shape. + resultStream = transform(resultSet); + } - if (updateCache && resultStream != null) { - List facets = getFacets(); - if (facets != null && facets.size() > 1) { - List again = new ArrayList<>(); - resultStream.forEach(result -> { - sessionOps.updateCache(result, facets); - again.add(result); - }); - resultStream = again.stream(); - } - } - return resultStream; + if (updateCache && resultStream != null) { + List facets = getFacets(); + if (facets != null && facets.size() > 1) { + List again = new ArrayList<>(); + resultStream.forEach(result -> { + sessionOps.updateCache(result, facets); + again.add(result); + }); + resultStream = again.stream(); + } + } + return resultStream; } finally { context.stop(); } } - public Stream sync(UnitOfWork uow) {//throws TimeoutException { + public Stream sync(UnitOfWork uow) {// throws TimeoutException { if (uow == null) return sync(); @@ -117,8 +113,8 @@ public abstract class AbstractStreamOperation facets = bindFacetValues(); cachedResult = checkCache(uow, facets); if (cachedResult != null) { @@ -137,13 +133,13 @@ public abstract class AbstractStreamOperation again = new ArrayList<>(); - List facets = getFacets(); - resultStream.forEach(result -> { - updateCache(uow, result, facets); - again.add(result); - }); - resultStream = again.stream(); + List again = new ArrayList<>(); + List facets = getFacets(); + resultStream.forEach(result -> { + updateCache(uow, result, facets); + again.add(result); + }); + resultStream = again.stream(); } return resultStream; @@ -154,11 +150,11 @@ public abstract class AbstractStreamOperation> async() { return CompletableFuture.>supplyAsync(() -> { -// try { - return sync(); -// } catch (TimeoutException ex) { -// throw new CompletionException(ex); -// } + // try { + return sync(); + // } catch (TimeoutException ex) { + // throw new CompletionException(ex); + // } }); } @@ -166,11 +162,11 @@ public abstract class AbstractStreamOperation>supplyAsync(() -> { -// try { - return sync(); -// } catch (TimeoutException ex) { -// throw new CompletionException(ex); -// } + // try { + return sync(); + // } catch (TimeoutException ex) { + // throw new CompletionException(ex); + // } }); } } diff --git a/src/main/java/net/helenus/core/operation/BoundOperation.java b/src/main/java/net/helenus/core/operation/BoundOperation.java index 4c71c7d..002dbd6 100644 --- a/src/main/java/net/helenus/core/operation/BoundOperation.java +++ b/src/main/java/net/helenus/core/operation/BoundOperation.java @@ -40,6 +40,8 @@ public final class BoundOperation extends AbstractOperation extends AbstractOptionalOperation extends AbstractStreamOperation extends AbstractOperation { } public ResultSet execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, long timeout, - TimeUnit units, boolean showValues, boolean cached) { //throws TimeoutException { + TimeUnit units, boolean showValues, boolean cached) { // throws TimeoutException { // Start recording in a Zipkin sub-span our execution time to perform this // operation. @@ -68,17 +67,18 @@ public abstract class Operation { } Statement statement = options(buildStatement(cached)); - Stopwatch timer = null; + Stopwatch timer = null; if (uow != null) { - timer = uow.getExecutionTimer(); - timer.start(); - } + timer = uow.getExecutionTimer(); + timer.start(); + } ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); - ResultSet resultSet = futureResultSet.getUninterruptibly(); //TODO(gburd): (timeout, units); + ResultSet resultSet = futureResultSet.getUninterruptibly(); // TODO(gburd): (timeout, units); - if (uow != null) timer.stop(); + if (uow != null) + timer.stop(); - return resultSet; + return resultSet; } finally { @@ -104,6 +104,8 @@ public abstract class Operation { return null; } - public boolean isSessionCacheable() { return false; } + public boolean isSessionCacheable() { + return false; + } } diff --git a/src/main/java/net/helenus/core/operation/SelectFirstOperation.java b/src/main/java/net/helenus/core/operation/SelectFirstOperation.java index 8f46c8e..15a25a6 100644 --- a/src/main/java/net/helenus/core/operation/SelectFirstOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectFirstOperation.java @@ -60,6 +60,8 @@ public final class SelectFirstOperation extends AbstractFilterOptionalOperati return delegate.transform(resultSet).findFirst(); } - @Override - public boolean isSessionCacheable() { return delegate.isSessionCacheable(); } + @Override + public boolean isSessionCacheable() { + return delegate.isSessionCacheable(); + } } diff --git a/src/main/java/net/helenus/core/operation/SelectFirstTransformingOperation.java b/src/main/java/net/helenus/core/operation/SelectFirstTransformingOperation.java index e878ac6..727d663 100644 --- a/src/main/java/net/helenus/core/operation/SelectFirstTransformingOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectFirstTransformingOperation.java @@ -55,6 +55,8 @@ public final class SelectFirstTransformingOperation return delegate.transform(resultSet).findFirst().map(fn); } - @Override - public boolean isSessionCacheable() { return delegate.isSessionCacheable(); } + @Override + public boolean isSessionCacheable() { + return delegate.isSessionCacheable(); + } } diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index 915cdbf..bc658f2 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -20,6 +20,9 @@ import java.util.function.Function; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.querybuilder.BuiltStatement; @@ -41,12 +44,10 @@ import net.helenus.mapping.value.ColumnValueProvider; import net.helenus.mapping.value.ValueProviderMap; import net.helenus.support.Fun; import net.helenus.support.HelenusMappingException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public final class SelectOperation extends AbstractFilterStreamOperation> { - private static final Logger LOG = LoggerFactory.getLogger(SelectOperation.class); + private static final Logger LOG = LoggerFactory.getLogger(SelectOperation.class); protected final List props = new ArrayList(); protected Function rowMapper = null; @@ -86,7 +87,7 @@ public final class SelectOperation extends AbstractFilterStreamOperation new HelenusPropertyNode(p, Optional.empty())) .forEach(p -> this.props.add(p)); - isCacheable = entity.isCacheable(); + isCacheable = entity.isCacheable(); } public SelectOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity, @@ -98,7 +99,7 @@ public final class SelectOperation extends AbstractFilterStreamOperation new HelenusPropertyNode(p, Optional.empty())) .forEach(p -> this.props.add(p)); - isCacheable = entity.isCacheable(); + isCacheable = entity.isCacheable(); } public SelectOperation(AbstractSessionOperations sessionOperations, Function rowMapper, @@ -185,8 +186,10 @@ public final class SelectOperation extends AbstractFilterStreamOperation getFacets() { @@ -200,10 +203,10 @@ public final class SelectOperation extends AbstractFilterStreamOperation boundFacets = new ArrayList<>(); for (Facet facet : entity.getFacets()) { - if (facet instanceof UnboundFacet) { - UnboundFacet unboundFacet = (UnboundFacet) facet; - UnboundFacet.Binder binder = unboundFacet.binder(); - unboundFacet.getProperties().forEach(prop -> { + if (facet instanceof UnboundFacet) { + UnboundFacet unboundFacet = (UnboundFacet) facet; + UnboundFacet.Binder binder = unboundFacet.binder(); + unboundFacet.getProperties().forEach(prop -> { Filter filter = filters.get(prop); if (filter != null) { Object[] postulates = filter.postulateValues(); diff --git a/src/main/java/net/helenus/core/operation/UpdateOperation.java b/src/main/java/net/helenus/core/operation/UpdateOperation.java index 201365d..9227cb8 100644 --- a/src/main/java/net/helenus/core/operation/UpdateOperation.java +++ b/src/main/java/net/helenus/core/operation/UpdateOperation.java @@ -16,7 +16,6 @@ package net.helenus.core.operation; import java.util.*; -import java.util.concurrent.TimeoutException; import java.util.function.Function; import com.datastax.driver.core.ResultSet; @@ -570,12 +569,13 @@ public final class UpdateOperation extends AbstractFilterOperation