From 654f4434bfbf208175f9ade371f0fe2754b8fb61 Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Wed, 28 Mar 2018 08:27:49 -0400 Subject: [PATCH] Remove Zipkin. Revert abstracted UnitOfWork into single class. --- helenus-core.iml | 3 - pom.xml | 14 +- .../core/AbstractSessionOperations.java | 5 - .../net/helenus/core/AbstractUnitOfWork.java | 570 ----------------- .../java/net/helenus/core/HelenusSession.java | 96 +-- .../net/helenus/core/PostCommitFunction.java | 2 +- .../net/helenus/core/SessionInitializer.java | 17 +- .../java/net/helenus/core/UnitOfWork.java | 603 ++++++++++++++++-- .../java/net/helenus/core/UnitOfWorkImpl.java | 26 - .../core/operation/AbstractOperation.java | 17 +- .../operation/AbstractOptionalOperation.java | 14 +- .../operation/AbstractStatementOperation.java | 17 +- .../operation/AbstractStreamOperation.java | 10 +- .../core/operation/BatchOperation.java | 18 +- .../net/helenus/core/operation/Operation.java | 146 ++--- .../core/simple/SimpleUserTest.java | 17 - .../core/unitofwork/AndThenOrderTest.java | 10 +- 17 files changed, 655 insertions(+), 930 deletions(-) delete mode 100644 src/main/java/net/helenus/core/AbstractUnitOfWork.java delete mode 100644 src/main/java/net/helenus/core/UnitOfWorkImpl.java diff --git a/helenus-core.iml b/helenus-core.iml index 2578277..d38133a 100644 --- a/helenus-core.iml +++ b/helenus-core.iml @@ -36,9 +36,6 @@ - - - diff --git a/pom.xml b/pom.xml index 0011e00..8b502a7 100644 --- a/pom.xml +++ b/pom.xml @@ -154,19 +154,7 @@ 20.0 - - - io.zipkin.java - zipkin - 1.29.2 - - - - io.zipkin.brave - brave - 4.0.6 - - + io.dropwizard.metrics metrics-core diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index 65d98f8..a5603e6 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -15,7 +15,6 @@ */ package net.helenus.core; -import brave.Tracer; import com.codahale.metrics.MetricRegistry; import com.datastax.driver.core.*; import com.google.common.base.Stopwatch; @@ -110,10 +109,6 @@ public abstract class AbstractSessionOperations { } } - public Tracer getZipkinTracer() { - return null; - } - public MetricRegistry getMetricRegistry() { return null; } diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java deleted file mode 100644 index 51ec9cc..0000000 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ /dev/null @@ -1,570 +0,0 @@ -/* - * Copyright (C) 2015 The Helenus Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.helenus.core; - -import static net.helenus.core.HelenusSession.deleted; - -import com.google.common.base.Stopwatch; -import com.google.common.collect.HashBasedTable; -import com.google.common.collect.Table; -import com.google.common.collect.TreeTraverser; -import java.io.Serializable; -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; -import javax.cache.Cache; -import javax.cache.CacheManager; -import javax.cache.integration.CacheLoader; -import javax.cache.integration.CacheLoaderException; -import net.helenus.core.cache.CacheUtil; -import net.helenus.core.cache.Facet; -import net.helenus.core.cache.MapCache; -import net.helenus.core.operation.AbstractOperation; -import net.helenus.core.operation.BatchOperation; -import net.helenus.mapping.MappingUtil; -import net.helenus.support.Either; -import net.helenus.support.HelenusException; -import org.apache.commons.lang3.SerializationUtils; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Encapsulates the concept of a "transaction" as a unit-of-work. */ -public abstract class AbstractUnitOfWork - implements UnitOfWork, AutoCloseable { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractUnitOfWork.class); - - private final List> nested = new ArrayList<>(); - private final HelenusSession session; - public final AbstractUnitOfWork parent; - private final Table>> cache = HashBasedTable.create(); - private final MapCache statementCache; - protected String purpose; - protected List nestedPurposes = new ArrayList(); - protected String info; - protected int cacheHits = 0; - protected int cacheMisses = 0; - protected int databaseLookups = 0; - protected Stopwatch elapsedTime; - protected Map databaseTime = new HashMap<>(); - protected double cacheLookupTimeMSecs = 0.0; - private List commitThunks = new ArrayList(); - private List abortThunks = new ArrayList(); - private List> asyncOperationFutures = new ArrayList>(); - private boolean aborted = false; - private boolean committed = false; - private long committedAt = 0L; - private BatchOperation batch; - - protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork parent) { - Objects.requireNonNull(session, "containing session cannot be null"); - - this.session = session; - this.parent = parent; - CacheLoader cacheLoader = null; - if (parent != null) { - cacheLoader = - new CacheLoader() { - - Cache cache = parent.getCache(); - - @Override - public Object load(String key) throws CacheLoaderException { - return cache.get(key); - } - - @Override - public Map loadAll(Iterable keys) - throws CacheLoaderException { - Map kvp = new HashMap(); - for (String key : keys) { - kvp.put(key, cache.get(key)); - } - return kvp; - } - }; - } - this.statementCache = - new MapCache(null, "UOW(" + hashCode() + ")", cacheLoader, true); - } - - @Override - public void addDatabaseTime(String name, Stopwatch amount) { - Double time = databaseTime.get(name); - if (time == null) { - databaseTime.put(name, (double) amount.elapsed(TimeUnit.MICROSECONDS)); - } else { - databaseTime.put(name, time + amount.elapsed(TimeUnit.MICROSECONDS)); - } - } - - @Override - public void addCacheLookupTime(Stopwatch amount) { - cacheLookupTimeMSecs += amount.elapsed(TimeUnit.MICROSECONDS); - } - - @Override - public void addNestedUnitOfWork(UnitOfWork uow) { - synchronized (nested) { - nested.add((AbstractUnitOfWork) uow); - } - } - - @Override - public synchronized UnitOfWork begin() { - elapsedTime = Stopwatch.createStarted(); - // log.record(txn::start) - return this; - } - - @Override - public String getPurpose() { - return purpose; - } - - @Override - public UnitOfWork setPurpose(String purpose) { - this.purpose = purpose; - return this; - } - - @Override - public void addFuture(CompletableFuture future) { - asyncOperationFutures.add(future); - } - - @Override - public void setInfo(String info) { - this.info = info; - } - - @Override - public void recordCacheAndDatabaseOperationCount(int cache, int ops) { - if (cache > 0) { - cacheHits += cache; - } else { - cacheMisses += Math.abs(cache); - } - if (ops > 0) { - databaseLookups += ops; - } - } - - public String logTimers(String what) { - double e = (double) elapsedTime.elapsed(TimeUnit.MICROSECONDS) / 1000.0; - double d = 0.0; - double c = cacheLookupTimeMSecs / 1000.0; - double fc = (c / e) * 100.0; - String database = ""; - if (databaseTime.size() > 0) { - List dbt = new ArrayList<>(databaseTime.size()); - for (Map.Entry dt : databaseTime.entrySet()) { - double t = dt.getValue() / 1000.0; - d += t; - dbt.add(String.format("%s took %,.3fms %,2.2f%%", dt.getKey(), t, (t / e) * 100.0)); - } - double fd = (d / e) * 100.0; - database = - String.format( - ", %d quer%s (%,.3fms %,2.2f%% - %s)", - databaseLookups, (databaseLookups > 1) ? "ies" : "y", d, fd, String.join(", ", dbt)); - } - String cache = ""; - if (cacheLookupTimeMSecs > 0) { - int cacheLookups = cacheHits + cacheMisses; - cache = - String.format( - " with %d cache lookup%s (%,.3fms %,2.2f%% - %,d hit, %,d miss)", - cacheLookups, cacheLookups > 1 ? "s" : "", c, fc, cacheHits, cacheMisses); - } - String da = ""; - if (databaseTime.size() > 0 || cacheLookupTimeMSecs > 0) { - double dat = d + c; - double daf = (dat / e) * 100; - da = - String.format( - " consuming %,.3fms for data access, or %,2.2f%% of total UOW time.", dat, daf); - } - String x = nestedPurposes.stream().distinct().collect(Collectors.joining(", ")); - String n = - nested - .stream() - .map(uow -> String.valueOf(uow.hashCode())) - .collect(Collectors.joining(", ")); - String s = - String.format( - Locale.US, - "UOW(%s%s) %s in %,.3fms%s%s%s%s%s%s", - hashCode(), - (nested.size() > 0 ? ", [" + n + "]" : ""), - what, - e, - cache, - database, - da, - (purpose == null ? "" : " " + purpose), - (nestedPurposes.isEmpty()) ? "" : ", " + x, - (info == null) ? "" : " " + info); - return s; - } - - private void applyPostCommitFunctions(String what, List thunks) { - if (!thunks.isEmpty()) { - for (CommitThunk f : thunks) { - f.apply(); - } - } - } - - @Override - public Optional cacheLookup(List facets) { - String tableName = CacheUtil.schemaName(facets); - Optional result = Optional.empty(); - for (Facet facet : facets) { - if (!facet.fixed()) { - String columnName = facet.name() + "==" + facet.value(); - Either> eitherValue = cache.get(tableName, columnName); - if (eitherValue != null) { - Object value = deleted; - if (eitherValue.isLeft()) { - value = eitherValue.getLeft(); - } - return Optional.of(value); - } - } - } - - // Be sure to check all enclosing UnitOfWork caches as well, we may be nested. - result = checkParentCache(facets); - if (result.isPresent()) { - Object r = result.get(); - Class iface = MappingUtil.getMappingInterface(r); - if (Helenus.entity(iface).isDraftable()) { - cacheUpdate(r, facets); - } else { - cacheUpdate(SerializationUtils.clone((Serializable) r), facets); - } - } - return result; - } - - private Optional checkParentCache(List facets) { - Optional result = Optional.empty(); - if (parent != null) { - result = parent.checkParentCache(facets); - } - return result; - } - - @Override - public List cacheEvict(List facets) { - Either> deletedObjectFacets = Either.right(facets); - String tableName = CacheUtil.schemaName(facets); - Optional optionalValue = cacheLookup(facets); - - for (Facet facet : facets) { - if (!facet.fixed()) { - String columnKey = facet.name() + "==" + facet.value(); - // mark the value identified by the facet to `deleted` - cache.put(tableName, columnKey, deletedObjectFacets); - } - } - - // Now, look for other row/col pairs that referenced the same object, mark them - // `deleted` if the cache had a value before we added the deleted marker objects. - if (optionalValue.isPresent()) { - Object value = optionalValue.get(); - cache - .columnKeySet() - .forEach( - columnKey -> { - Either> eitherCachedValue = cache.get(tableName, columnKey); - if (eitherCachedValue.isLeft()) { - Object cachedValue = eitherCachedValue.getLeft(); - if (cachedValue == value) { - cache.put(tableName, columnKey, deletedObjectFacets); - String[] parts = columnKey.split("=="); - facets.add(new Facet(parts[0], parts[1])); - } - } - }); - } - return facets; - } - - @Override - public Cache getCache() { - return statementCache; - } - - @Override - public Object cacheUpdate(Object value, List facets) { - Object result = null; - String tableName = CacheUtil.schemaName(facets); - for (Facet facet : facets) { - if (!facet.fixed()) { - if (facet.alone()) { - String columnName = facet.name() + "==" + facet.value(); - if (result == null) result = cache.get(tableName, columnName); - cache.put(tableName, columnName, Either.left(value)); - } - } - } - return result; - } - - public void batch(AbstractOperation s) { - if (batch == null) { - batch = new BatchOperation(session); - } - batch.add(s); - } - - private Iterator> getChildNodes() { - return nested.iterator(); - } - - /** - * Checks to see if the work performed between calling begin and now can be committed or not. - * - * @return a function from which to chain work that only happens when commit is successful - * @throws E when the work overlaps with other concurrent writers. - */ - public synchronized PostCommitFunction commit() throws E, TimeoutException { - - if (isDone()) { - return new PostCommitFunction(this, null, null, false); - } - - // Only the outer-most UOW batches statements for commit time, execute them. - if (batch != null) { - committedAt = batch.sync(this); //TODO(gburd): update cache with writeTime... - } - - // All nested UnitOfWork should be committed (not aborted) before calls to - // commit, check. - boolean canCommit = true; - TreeTraverser> traverser = - TreeTraverser.using(node -> node::getChildNodes); - for (AbstractUnitOfWork uow : traverser.postOrderTraversal(this)) { - if (this != uow) { - canCommit &= (!uow.aborted && uow.committed); - } - } - - if (!canCommit) { - - if (parent == null) { - - // Apply all post-commit abort functions, this is the outer-most UnitOfWork. - traverser - .postOrderTraversal(this) - .forEach( - uow -> { - applyPostCommitFunctions("aborted", abortThunks); - }); - - elapsedTime.stop(); - if (LOG.isInfoEnabled()) { - LOG.info(logTimers("aborted")); - } - } - - return new PostCommitFunction(this, null, null, false); - } else { - committed = true; - aborted = false; - - if (parent == null) { - - // Apply all post-commit commit functions, this is the outer-most UnitOfWork. - traverser - .postOrderTraversal(this) - .forEach( - uow -> { - applyPostCommitFunctions("committed", uow.commitThunks); - }); - - // Merge our statement cache into the session cache if it exists. - CacheManager cacheManager = session.getCacheManager(); - if (cacheManager != null) { - for (Map.Entry entry : - (Set>) statementCache.unwrap(Map.class).entrySet()) { - String[] keyParts = entry.getKey().split("\\."); - if (keyParts.length == 2) { - String cacheName = keyParts[0]; - String key = keyParts[1]; - if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) { - Cache cache = cacheManager.getCache(cacheName); - if (cache != null) { - Object value = entry.getValue(); - if (value == deleted) { - cache.remove(key); - } else { - cache.put(key.toString(), value); - } - } - } - } - } - } - - // Merge our cache into the session cache. - session.mergeCache(cache); - - // Spoil any lingering futures that may be out there. - asyncOperationFutures.forEach( - f -> - f.completeExceptionally( - new HelenusException( - "Futures must be resolved before their unit of work has committed/aborted."))); - - elapsedTime.stop(); - if (LOG.isInfoEnabled()) { - LOG.info(logTimers("committed")); - } - - return new PostCommitFunction(this, null, null, true); - } else { - - // Merge cache and statistics into parent if there is one. - parent.statementCache.putAll(statementCache.unwrap(Map.class)); - parent.mergeCache(cache); - parent.addBatched(batch); - if (purpose != null) { - parent.nestedPurposes.add(purpose); - } - parent.cacheHits += cacheHits; - parent.cacheMisses += cacheMisses; - parent.databaseLookups += databaseLookups; - parent.cacheLookupTimeMSecs += cacheLookupTimeMSecs; - for (Map.Entry dt : databaseTime.entrySet()) { - String name = dt.getKey(); - if (parent.databaseTime.containsKey(name)) { - double t = parent.databaseTime.get(name); - parent.databaseTime.put(name, t + dt.getValue()); - } else { - parent.databaseTime.put(name, dt.getValue()); - } - } - } - } - // TODO(gburd): hopefully we'll be able to detect conflicts here and so we'd want to... - // else { - // Constructor ctor = clazz.getConstructor(conflictExceptionClass); - // T object = ctor.newInstance(new Object[] { String message }); - // } - return new PostCommitFunction(this, commitThunks, abortThunks, true); - } - - private void addBatched(BatchOperation batch) { - if (batch != null) { - if (this.batch == null) { - this.batch = batch; - } else { - this.batch.addAll(batch); - } - } - } - - /* Explicitly discard the work and mark it as as such in the log. */ - public synchronized void abort() { - if (!aborted) { - aborted = true; - - // Spoil any pending futures created within the context of this unit of work. - asyncOperationFutures.forEach( - f -> - f.completeExceptionally( - new HelenusException( - "Futures must be resolved before their unit of work has committed/aborted."))); - - TreeTraverser> traverser = - TreeTraverser.using(node -> node::getChildNodes); - traverser - .postOrderTraversal(this) - .forEach( - uow -> { - applyPostCommitFunctions("aborted", uow.abortThunks); - uow.abortThunks.clear(); - }); - - if (parent == null) { - elapsedTime.stop(); - if (LOG.isInfoEnabled()) { - LOG.info(logTimers("aborted")); - } - } - - // TODO(gburd): when we integrate the transaction support we'll need to... - // log.record(txn::abort) - // cache.invalidateSince(txn::start time) - } - } - - private void mergeCache(Table>> from) { - Table>> to = this.cache; - from.rowMap() - .forEach( - (rowKey, columnMap) -> { - columnMap.forEach( - (columnKey, value) -> { - if (to.contains(rowKey, columnKey)) { - to.put( - rowKey, - columnKey, - Either.left( - CacheUtil.merge( - to.get(rowKey, columnKey).getLeft(), - from.get(rowKey, columnKey).getLeft()))); - } else { - to.put(rowKey, columnKey, from.get(rowKey, columnKey)); - } - }); - }); - } - - public boolean isDone() { - return aborted || committed; - } - - public String describeConflicts() { - return "it's complex..."; - } - - @Override - public void close() throws E { - // Closing a AbstractUnitOfWork will abort iff we've not already aborted or committed this unit of work. - if (aborted == false && committed == false) { - abort(); - } - } - - public boolean hasAborted() { - return aborted; - } - - public boolean hasCommitted() { - return committed; - } - - public long committedAt() { - return committedAt; - } -} diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index c26ace1..97cce0b 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -17,19 +17,14 @@ package net.helenus.core; import static net.helenus.core.Query.eq; -import brave.Tracer; import com.codahale.metrics.MetricRegistry; import com.datastax.driver.core.*; import com.google.common.collect.Table; import java.io.Closeable; import java.io.PrintStream; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.util.*; import java.util.concurrent.Executor; import java.util.function.Function; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.cache.Cache; import javax.cache.CacheManager; @@ -48,24 +43,16 @@ import net.helenus.support.*; import net.helenus.support.Fun.Tuple1; import net.helenus.support.Fun.Tuple2; import net.helenus.support.Fun.Tuple6; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class HelenusSession extends AbstractSessionOperations implements Closeable { - - private static final Logger LOG = LoggerFactory.getLogger(HelenusSession.class); public static final Object deleted = new Object(); - private static final Pattern classNameRegex = - Pattern.compile("^(?:\\w+\\.)+(?:(\\w+)|(\\w+)\\$.*)$"); private final Session session; private final CodecRegistry registry; private final ConsistencyLevel defaultConsistencyLevel; private final boolean defaultQueryIdempotency; private final MetricRegistry metricRegistry; - private final Tracer zipkinTracer; private final PrintStream printStream; - private final Class unitOfWorkClass; private final SessionRepository sessionRepository; private final Executor executor; private final boolean dropSchemaOnClose; @@ -89,10 +76,8 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab boolean dropSchemaOnClose, ConsistencyLevel consistencyLevel, boolean defaultQueryIdempotency, - Class unitOfWorkClass, CacheManager cacheManager, - MetricRegistry metricRegistry, - Tracer tracer) { + MetricRegistry metricRegistry) { this.session = session; this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry; this.usingKeyspace = @@ -107,9 +92,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab this.dropSchemaOnClose = dropSchemaOnClose; this.defaultConsistencyLevel = consistencyLevel; this.defaultQueryIdempotency = defaultQueryIdempotency; - this.unitOfWorkClass = unitOfWorkClass; this.metricRegistry = metricRegistry; - this.zipkinTracer = tracer; this.cacheManager = cacheManager; this.valueProvider = new RowColumnValueProvider(this.sessionRepository); @@ -117,6 +100,14 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab this.metadata = session == null ? null : session.getCluster().getMetadata(); } + public UnitOfWork begin() { + return new UnitOfWork(this).begin(); + } + + public UnitOfWork begin(UnitOfWork parent) { + return new UnitOfWork(this, parent).begin(); + } + @Override public Session currentSession() { return session; @@ -187,11 +178,6 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab return valuePreparer; } - @Override - public Tracer getZipkinTracer() { - return zipkinTracer; - } - @Override public MetricRegistry getMetricRegistry() { return metricRegistry; @@ -360,70 +346,6 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab return metadata; } - public UnitOfWork begin() { - return this.begin(null); - } - - private String extractClassNameFromStackFrame(String classNameOnStack) { - String name = null; - Matcher m = classNameRegex.matcher(classNameOnStack); - if (m.find()) { - name = (m.group(1) != null) ? m.group(1) : ((m.group(2) != null) ? m.group(2) : name); - } else { - name = classNameOnStack; - } - return name; - } - - public synchronized UnitOfWork begin(UnitOfWork parent) { - try { - Class clazz = unitOfWorkClass; - Constructor ctor = - clazz.getConstructor(HelenusSession.class, UnitOfWork.class); - UnitOfWork uow = ctor.newInstance(this, parent); - if (LOG.isInfoEnabled() && uow.getPurpose() == null) { - StringBuilder purpose = null; - int frame = 0; - StackTraceElement[] trace = Thread.currentThread().getStackTrace(); - String targetClassName = HelenusSession.class.getSimpleName(); - String stackClassName = null; - do { - frame++; - stackClassName = extractClassNameFromStackFrame(trace[frame].getClassName()); - } while (!stackClassName.equals(targetClassName) && frame < trace.length); - do { - frame++; - stackClassName = extractClassNameFromStackFrame(trace[frame].getClassName()); - } while (stackClassName.equals(targetClassName) && frame < trace.length); - if (frame < trace.length) { - purpose = - new StringBuilder() - .append(trace[frame].getClassName()) - .append(".") - .append(trace[frame].getMethodName()) - .append("(") - .append(trace[frame].getFileName()) - .append(":") - .append(trace[frame].getLineNumber()) - .append(")"); - uow.setPurpose(purpose.toString()); - } - } - if (parent != null) { - parent.addNestedUnitOfWork(uow); - } - return uow.begin(); - } catch (NoSuchMethodException - | InvocationTargetException - | InstantiationException - | IllegalAccessException e) { - throw new HelenusException( - String.format( - "Unable to instantiate %s as a UnitOfWork.", unitOfWorkClass.getSimpleName()), - e); - } - } - public SelectOperation select(E pojo) { Objects.requireNonNull( pojo, "supplied object must be a dsl for a registered entity but cannot be null"); diff --git a/src/main/java/net/helenus/core/PostCommitFunction.java b/src/main/java/net/helenus/core/PostCommitFunction.java index c1be72d..0c823cf 100644 --- a/src/main/java/net/helenus/core/PostCommitFunction.java +++ b/src/main/java/net/helenus/core/PostCommitFunction.java @@ -33,7 +33,7 @@ public class PostCommitFunction implements java.util.function.Function exceptionally(CommitThunk after) { + public PostCommitFunction orElse(CommitThunk after) { Objects.requireNonNull(after); if (abortThunks == null) { if (!committed) { diff --git a/src/main/java/net/helenus/core/SessionInitializer.java b/src/main/java/net/helenus/core/SessionInitializer.java index 861cbf1..c66d1db 100644 --- a/src/main/java/net/helenus/core/SessionInitializer.java +++ b/src/main/java/net/helenus/core/SessionInitializer.java @@ -15,7 +15,6 @@ */ package net.helenus.core; -import brave.Tracer; import com.codahale.metrics.MetricRegistry; import com.datastax.driver.core.*; import com.google.common.util.concurrent.MoreExecutors; @@ -47,10 +46,8 @@ public final class SessionInitializer extends AbstractSessionOperations { private ConsistencyLevel consistencyLevel; private boolean idempotent = false; private MetricRegistry metricRegistry = new MetricRegistry(); - private Tracer zipkinTracer; private PrintStream printStream = System.out; private Executor executor = MoreExecutors.directExecutor(); - private Class unitOfWorkClass = UnitOfWorkImpl.class; private SessionRepositoryBuilder sessionRepository; private boolean dropUnusedColumns = false; private boolean dropUnusedIndexes = false; @@ -131,16 +128,6 @@ public final class SessionInitializer extends AbstractSessionOperations { return this; } - public SessionInitializer zipkinTracer(Tracer tracer) { - this.zipkinTracer = tracer; - return this; - } - - public SessionInitializer setUnitOfWorkClass(Class e) { - this.unitOfWorkClass = e; - return this; - } - public SessionInitializer consistencyLevel(ConsistencyLevel consistencyLevel) { this.consistencyLevel = consistencyLevel; return this; @@ -292,10 +279,8 @@ public final class SessionInitializer extends AbstractSessionOperations { autoDdl == AutoDdl.CREATE_DROP, consistencyLevel, idempotent, - unitOfWorkClass, cacheManager, - metricRegistry, - zipkinTracer); + metricRegistry); } private void initialize() { diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 5cc1ce6..c5828eb 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -15,16 +15,164 @@ */ package net.helenus.core; -import com.google.common.base.Stopwatch; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeoutException; -import javax.cache.Cache; -import net.helenus.core.cache.Facet; -import net.helenus.core.operation.AbstractOperation; +import static net.helenus.core.HelenusSession.deleted; -public interface UnitOfWork extends AutoCloseable { +import com.google.common.base.Stopwatch; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; +import com.google.common.collect.TreeTraverser; +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import javax.cache.Cache; +import javax.cache.CacheManager; +import javax.cache.integration.CacheLoader; +import javax.cache.integration.CacheLoaderException; +import net.helenus.core.cache.CacheUtil; +import net.helenus.core.cache.Facet; +import net.helenus.core.cache.MapCache; +import net.helenus.core.operation.AbstractOperation; +import net.helenus.core.operation.BatchOperation; +import net.helenus.mapping.MappingUtil; +import net.helenus.support.Either; +import net.helenus.support.HelenusException; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Encapsulates the concept of a "transaction" as a unit-of-work. */ +public class UnitOfWork implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(UnitOfWork.class); + private static final Pattern classNameRegex = + Pattern.compile("^(?:\\w+\\.)+(?:(\\w+)|(\\w+)\\$.*)$"); + + private final List nested = new ArrayList<>(); + private final HelenusSession session; + public final UnitOfWork parent; + private final Table>> cache = HashBasedTable.create(); + private final MapCache statementCache; + protected String purpose; + protected List nestedPurposes = new ArrayList(); + protected String info; + protected int cacheHits = 0; + protected int cacheMisses = 0; + protected int databaseLookups = 0; + protected final Stopwatch elapsedTime; + protected Map databaseTime = new HashMap<>(); + protected double cacheLookupTimeMSecs = 0.0; + private List commitThunks = new ArrayList(); + private List abortThunks = new ArrayList(); + private List> asyncOperationFutures = new ArrayList>(); + private boolean aborted = false; + private boolean committed = false; + private long committedAt = 0L; + private BatchOperation batch; + + private String extractClassNameFromStackFrame(String classNameOnStack) { + String name = null; + Matcher m = classNameRegex.matcher(classNameOnStack); + if (m.find()) { + name = (m.group(1) != null) ? m.group(1) : ((m.group(2) != null) ? m.group(2) : name); + } else { + name = classNameOnStack; + } + return name; + } + + public UnitOfWork(HelenusSession session) { + this(session, null); + } + + public UnitOfWork(HelenusSession session, UnitOfWork parent) { + Objects.requireNonNull(session, "containing session cannot be null"); + + this.parent = parent; + if (parent != null) { + parent.addNestedUnitOfWork(this); + } + this.session = session; + CacheLoader cacheLoader = null; + if (parent != null) { + cacheLoader = + new CacheLoader() { + + Cache cache = parent.getCache(); + + @Override + public Object load(String key) throws CacheLoaderException { + return cache.get(key); + } + + @Override + public Map loadAll(Iterable keys) + throws CacheLoaderException { + Map kvp = new HashMap(); + for (String key : keys) { + kvp.put(key, cache.get(key)); + } + return kvp; + } + }; + } + this.elapsedTime = Stopwatch.createUnstarted(); + this.statementCache = + new MapCache(null, "UOW(" + hashCode() + ")", cacheLoader, true); + + if (LOG.isInfoEnabled()) { + StringBuilder purpose = null; + int frame = 0; + StackTraceElement[] trace = Thread.currentThread().getStackTrace(); + String targetClassName = HelenusSession.class.getSimpleName(); + String stackClassName = null; + do { + frame++; + stackClassName = extractClassNameFromStackFrame(trace[frame].getClassName()); + } while (!stackClassName.equals(targetClassName) && frame < trace.length); + do { + frame++; + stackClassName = extractClassNameFromStackFrame(trace[frame].getClassName()); + } while (stackClassName.equals(targetClassName) && frame < trace.length); + if (frame < trace.length) { + purpose = + new StringBuilder() + .append(trace[frame].getClassName()) + .append(".") + .append(trace[frame].getMethodName()) + .append("(") + .append(trace[frame].getFileName()) + .append(":") + .append(trace[frame].getLineNumber()) + .append(")"); + this.purpose = purpose.toString(); + } + } + } + + public void addDatabaseTime(String name, Stopwatch amount) { + Double time = databaseTime.get(name); + if (time == null) { + databaseTime.put(name, (double) amount.elapsed(TimeUnit.MICROSECONDS)); + } else { + databaseTime.put(name, time + amount.elapsed(TimeUnit.MICROSECONDS)); + } + } + + public void addCacheLookupTime(Stopwatch amount) { + cacheLookupTimeMSecs += amount.elapsed(TimeUnit.MICROSECONDS); + } + + public void addNestedUnitOfWork(UnitOfWork uow) { + synchronized (nested) { + nested.add(uow); + } + } /** * Marks the beginning of a transactional section of work. Will write a @@ -32,54 +180,437 @@ public interface UnitOfWork extends AutoCloseable { * * @return the handle used to commit or abort the work. */ - UnitOfWork begin(); + public synchronized UnitOfWork begin() { + elapsedTime.start(); + // log.record(txn::start) + return this; + } - void addNestedUnitOfWork(UnitOfWork uow); + public String getPurpose() { + return purpose; + } + + public UnitOfWork setPurpose(String purpose) { + this.purpose = purpose; + return this; + } + + public void addFuture(CompletableFuture future) { + asyncOperationFutures.add(future); + } + + public void setInfo(String info) { + this.info = info; + } + + public void recordCacheAndDatabaseOperationCount(int cache, int ops) { + if (cache > 0) { + cacheHits += cache; + } else { + cacheMisses += Math.abs(cache); + } + if (ops > 0) { + databaseLookups += ops; + } + } + + public String logTimers(String what) { + double e = (double) elapsedTime.elapsed(TimeUnit.MICROSECONDS) / 1000.0; + double d = 0.0; + double c = cacheLookupTimeMSecs / 1000.0; + double fc = (c / e) * 100.0; + String database = ""; + if (databaseTime.size() > 0) { + List dbt = new ArrayList<>(databaseTime.size()); + for (Map.Entry dt : databaseTime.entrySet()) { + double t = dt.getValue() / 1000.0; + d += t; + dbt.add(String.format("%s took %,.3fms %,2.2f%%", dt.getKey(), t, (t / e) * 100.0)); + } + double fd = (d / e) * 100.0; + database = + String.format( + ", %d quer%s (%,.3fms %,2.2f%% - %s)", + databaseLookups, (databaseLookups > 1) ? "ies" : "y", d, fd, String.join(", ", dbt)); + } + String cache = ""; + if (cacheLookupTimeMSecs > 0) { + int cacheLookups = cacheHits + cacheMisses; + cache = + String.format( + " with %d cache lookup%s (%,.3fms %,2.2f%% - %,d hit, %,d miss)", + cacheLookups, cacheLookups > 1 ? "s" : "", c, fc, cacheHits, cacheMisses); + } + String da = ""; + if (databaseTime.size() > 0 || cacheLookupTimeMSecs > 0) { + double dat = d + c; + double daf = (dat / e) * 100; + da = + String.format( + " consuming %,.3fms for data access, or %,2.2f%% of total UOW time.", dat, daf); + } + String x = nestedPurposes.stream().distinct().collect(Collectors.joining(", ")); + String n = + nested + .stream() + .map(uow -> String.valueOf(uow.hashCode())) + .collect(Collectors.joining(", ")); + String s = + String.format( + Locale.US, + "UOW(%s%s) %s in %,.3fms%s%s%s%s%s%s", + hashCode(), + (nested.size() > 0 ? ", [" + n + "]" : ""), + what, + e, + cache, + database, + da, + (purpose == null ? "" : " " + purpose), + (nestedPurposes.isEmpty()) ? "" : ", " + x, + (info == null) ? "" : " " + info); + return s; + } + + private void applyPostCommitFunctions(String what, List thunks) { + if (!thunks.isEmpty()) { + for (CommitThunk f : thunks) { + f.apply(); + } + } + } + + public Optional cacheLookup(List facets) { + String tableName = CacheUtil.schemaName(facets); + Optional result = Optional.empty(); + for (Facet facet : facets) { + if (!facet.fixed()) { + String columnName = facet.name() + "==" + facet.value(); + Either> eitherValue = cache.get(tableName, columnName); + if (eitherValue != null) { + Object value = deleted; + if (eitherValue.isLeft()) { + value = eitherValue.getLeft(); + } + return Optional.of(value); + } + } + } + + // Be sure to check all enclosing UnitOfWork caches as well, we may be nested. + result = checkParentCache(facets); + if (result.isPresent()) { + Object r = result.get(); + Class iface = MappingUtil.getMappingInterface(r); + if (Helenus.entity(iface).isDraftable()) { + cacheUpdate(r, facets); + } else { + cacheUpdate(SerializationUtils.clone((Serializable) r), facets); + } + } + return result; + } + + private Optional checkParentCache(List facets) { + Optional result = Optional.empty(); + if (parent != null) { + result = parent.checkParentCache(facets); + } + return result; + } + + public List cacheEvict(List facets) { + Either> deletedObjectFacets = Either.right(facets); + String tableName = CacheUtil.schemaName(facets); + Optional optionalValue = cacheLookup(facets); + + for (Facet facet : facets) { + if (!facet.fixed()) { + String columnKey = facet.name() + "==" + facet.value(); + // mark the value identified by the facet to `deleted` + cache.put(tableName, columnKey, deletedObjectFacets); + } + } + + // Now, look for other row/col pairs that referenced the same object, mark them + // `deleted` if the cache had a value before we added the deleted marker objects. + if (optionalValue.isPresent()) { + Object value = optionalValue.get(); + cache + .columnKeySet() + .forEach( + columnKey -> { + Either> eitherCachedValue = cache.get(tableName, columnKey); + if (eitherCachedValue.isLeft()) { + Object cachedValue = eitherCachedValue.getLeft(); + if (cachedValue == value) { + cache.put(tableName, columnKey, deletedObjectFacets); + String[] parts = columnKey.split("=="); + facets.add(new Facet(parts[0], parts[1])); + } + } + }); + } + return facets; + } + + public Cache getCache() { + return statementCache; + } + + public Object cacheUpdate(Object value, List facets) { + Object result = null; + String tableName = CacheUtil.schemaName(facets); + for (Facet facet : facets) { + if (!facet.fixed()) { + if (facet.alone()) { + String columnName = facet.name() + "==" + facet.value(); + if (result == null) result = cache.get(tableName, columnName); + cache.put(tableName, columnName, Either.left(value)); + } + } + } + return result; + } + + public void batch(AbstractOperation s) { + if (batch == null) { + batch = new BatchOperation(session); + } + batch.add(s); + } + + private Iterator getChildNodes() { + return nested.iterator(); + } /** * Checks to see if the work performed between calling begin and now can be committed or not. * * @return a function from which to chain work that only happens when commit is successful - * @throws X when the work overlaps with other concurrent writers. + * @throws HelenusException when the work overlaps with other concurrent writers. */ - PostCommitFunction commit() throws X, TimeoutException; + public synchronized PostCommitFunction commit() + throws HelenusException, TimeoutException { + + if (isDone()) { + return new PostCommitFunction(this, null, null, false); + } + + // Only the outer-most UOW batches statements for commit time, execute them. + if (batch != null) { + committedAt = batch.sync(this); //TODO(gburd): update cache with writeTime... + } + + // All nested UnitOfWork should be committed (not aborted) before calls to + // commit, check. + boolean canCommit = true; + TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes); + for (UnitOfWork uow : traverser.postOrderTraversal(this)) { + if (this != uow) { + canCommit &= (!uow.aborted && uow.committed); + } + } + + if (!canCommit) { + + if (parent == null) { + + // Apply all post-commit abort functions, this is the outer-most UnitOfWork. + traverser + .postOrderTraversal(this) + .forEach( + uow -> { + applyPostCommitFunctions("aborted", abortThunks); + }); + + elapsedTime.stop(); + if (LOG.isInfoEnabled()) { + LOG.info(logTimers("aborted")); + } + } + + return new PostCommitFunction(this, null, null, false); + } else { + committed = true; + aborted = false; + + if (parent == null) { + + // Apply all post-commit commit functions, this is the outer-most UnitOfWork. + traverser + .postOrderTraversal(this) + .forEach( + uow -> { + applyPostCommitFunctions("committed", uow.commitThunks); + }); + + // Merge our statement cache into the session cache if it exists. + CacheManager cacheManager = session.getCacheManager(); + if (cacheManager != null) { + for (Map.Entry entry : + (Set>) statementCache.unwrap(Map.class).entrySet()) { + String[] keyParts = entry.getKey().split("\\."); + if (keyParts.length == 2) { + String cacheName = keyParts[0]; + String key = keyParts[1]; + if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) { + Cache cache = cacheManager.getCache(cacheName); + if (cache != null) { + Object value = entry.getValue(); + if (value == deleted) { + cache.remove(key); + } else { + cache.put(key.toString(), value); + } + } + } + } + } + } + + // Merge our cache into the session cache. + session.mergeCache(cache); + + // Spoil any lingering futures that may be out there. + asyncOperationFutures.forEach( + f -> + f.completeExceptionally( + new HelenusException( + "Futures must be resolved before their unit of work has committed/aborted."))); + + elapsedTime.stop(); + if (LOG.isInfoEnabled()) { + LOG.info(logTimers("committed")); + } + + return new PostCommitFunction(this, null, null, true); + } else { + + // Merge cache and statistics into parent if there is one. + parent.statementCache.putAll(statementCache.unwrap(Map.class)); + parent.mergeCache(cache); + parent.addBatched(batch); + if (purpose != null) { + parent.nestedPurposes.add(purpose); + } + parent.cacheHits += cacheHits; + parent.cacheMisses += cacheMisses; + parent.databaseLookups += databaseLookups; + parent.cacheLookupTimeMSecs += cacheLookupTimeMSecs; + for (Map.Entry dt : databaseTime.entrySet()) { + String name = dt.getKey(); + if (parent.databaseTime.containsKey(name)) { + double t = parent.databaseTime.get(name); + parent.databaseTime.put(name, t + dt.getValue()); + } else { + parent.databaseTime.put(name, dt.getValue()); + } + } + } + } + // TODO(gburd): hopefully we'll be able to detect conflicts here and so we'd want to... + // else { + // Constructor ctor = clazz.getConstructor(conflictExceptionClass); + // T object = ctor.newInstance(new Object[] { String message }); + // } + return new PostCommitFunction(this, commitThunks, abortThunks, true); + } + + private void addBatched(BatchOperation batch) { + if (batch != null) { + if (this.batch == null) { + this.batch = batch; + } else { + this.batch.addAll(batch); + } + } + } /** * Explicitly abort the work within this unit of work. Any nested aborted unit of work will * trigger the entire unit of work to commit. */ - void abort(); + public synchronized void abort() { + if (!aborted) { + aborted = true; - boolean hasAborted(); + // Spoil any pending futures created within the context of this unit of work. + asyncOperationFutures.forEach( + f -> + f.completeExceptionally( + new HelenusException( + "Futures must be resolved before their unit of work has committed/aborted."))); - boolean hasCommitted(); + TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes); + traverser + .postOrderTraversal(this) + .forEach( + uow -> { + applyPostCommitFunctions("aborted", uow.abortThunks); + uow.abortThunks.clear(); + }); - boolean isDone(); + if (parent == null) { + elapsedTime.stop(); + if (LOG.isInfoEnabled()) { + LOG.info(logTimers("aborted")); + } + } - long committedAt(); + // TODO(gburd): when we integrate the transaction support we'll need to... + // log.record(txn::abort) + // cache.invalidateSince(txn::start time) + } + } - void batch(AbstractOperation operation); + private void mergeCache(Table>> from) { + Table>> to = this.cache; + from.rowMap() + .forEach( + (rowKey, columnMap) -> { + columnMap.forEach( + (columnKey, value) -> { + if (to.contains(rowKey, columnKey)) { + to.put( + rowKey, + columnKey, + Either.left( + CacheUtil.merge( + to.get(rowKey, columnKey).getLeft(), + from.get(rowKey, columnKey).getLeft()))); + } else { + to.put(rowKey, columnKey, from.get(rowKey, columnKey)); + } + }); + }); + } - void addFuture(CompletableFuture future); + public boolean isDone() { + return aborted || committed; + } - Optional cacheLookup(List facets); + public String describeConflicts() { + return "it's complex..."; + } - Cache getCache(); + @Override + public void close() throws HelenusException { + // Closing a UnitOfWork will abort iff we've not already aborted or committed this unit of work. + if (aborted == false && committed == false) { + abort(); + } + } - Object cacheUpdate(Object pojo, List facets); + public boolean hasAborted() { + return aborted; + } - List cacheEvict(List facets); + public boolean hasCommitted() { + return committed; + } - String getPurpose(); - - UnitOfWork setPurpose(String purpose); - - void setInfo(String info); - - void addDatabaseTime(String name, Stopwatch amount); - - void addCacheLookupTime(Stopwatch amount); - - // Cache > 0 means "cache hit", < 0 means cache miss. - void recordCacheAndDatabaseOperationCount(int cache, int database); + public long committedAt() { + return committedAt; + } } diff --git a/src/main/java/net/helenus/core/UnitOfWorkImpl.java b/src/main/java/net/helenus/core/UnitOfWorkImpl.java deleted file mode 100644 index 52cae59..0000000 --- a/src/main/java/net/helenus/core/UnitOfWorkImpl.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (C) 2015 The Helenus Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.helenus.core; - -import net.helenus.support.HelenusException; - -class UnitOfWorkImpl extends AbstractUnitOfWork { - - @SuppressWarnings("unchecked") - public UnitOfWorkImpl(HelenusSession session, UnitOfWork parent) { - super(session, (AbstractUnitOfWork) parent); - } -} diff --git a/src/main/java/net/helenus/core/operation/AbstractOperation.java b/src/main/java/net/helenus/core/operation/AbstractOperation.java index 3bd8c98..9ae3aaa 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOperation.java @@ -41,13 +41,7 @@ public abstract class AbstractOperation> try { ResultSet resultSet = this.execute( - sessionOps, - null, - traceContext, - queryExecutionTimeout, - queryTimeoutUnits, - showValues, - false); + sessionOps, null, queryExecutionTimeout, queryTimeoutUnits, showValues, false); return transform(resultSet); } finally { context.stop(); @@ -60,14 +54,7 @@ public abstract class AbstractOperation> final Timer.Context context = requestLatency.time(); try { ResultSet resultSet = - execute( - sessionOps, - uow, - traceContext, - queryExecutionTimeout, - queryTimeoutUnits, - showValues, - true); + execute(sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, true); E result = transform(resultSet); return result; } finally { diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index ecb5813..4c7b290 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -98,7 +98,6 @@ public abstract class AbstractOptionalOperation sync(UnitOfWork uow) throws TimeoutException { + public Optional sync(UnitOfWork uow) throws TimeoutException { if (uow == null) return sync(); final Timer.Context context = requestLatency.time(); @@ -206,14 +205,7 @@ public abstract class AbstractOptionalOperation> async(UnitOfWork uow) { + public CompletableFuture> async(UnitOfWork uow) { if (uow == null) return async(); CompletableFuture> f = CompletableFuture.>supplyAsync( diff --git a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java index 61ff695..79855c6 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java @@ -15,8 +15,6 @@ */ package net.helenus.core.operation; -import brave.Tracer; -import brave.propagation.TraceContext; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.RegularStatement; @@ -254,17 +252,6 @@ public abstract class AbstractStatementOperation uow, List facets) { + protected E checkCache(UnitOfWork uow, List facets) { E result = null; Optional optionalCachedResult = Optional.empty(); @@ -331,7 +318,7 @@ public abstract class AbstractStatementOperation uow, E pojo, List identifyingFacets) { + protected Object cacheUpdate(UnitOfWork uow, E pojo, List identifyingFacets) { List facets = new ArrayList<>(); Map valueMap = pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; diff --git a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java index 0fc09a2..440411d 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java @@ -99,7 +99,6 @@ public abstract class AbstractStreamOperation { batch.setDefaultTimestamp(timestampGenerator.next()); ResultSet resultSet = this.execute( - sessionOps, - null, - traceContext, - queryExecutionTimeout, - queryTimeoutUnits, - showValues, - false); + sessionOps, null, queryExecutionTimeout, queryTimeoutUnits, showValues, false); if (!resultSet.wasApplied()) { throw new HelenusException("Failed to apply batch."); } @@ -88,7 +82,7 @@ public class BatchOperation extends Operation { return batch.getDefaultTimestamp(); } - public Long sync(UnitOfWork uow) throws TimeoutException { + public Long sync(UnitOfWork uow) throws TimeoutException { if (operations.size() == 0) return 0L; if (uow == null) return sync(); @@ -99,13 +93,7 @@ public class BatchOperation extends Operation { batch.setDefaultTimestamp(timestampGenerator.next()); ResultSet resultSet = this.execute( - sessionOps, - uow, - traceContext, - queryExecutionTimeout, - queryTimeoutUnits, - showValues, - false); + sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, false); if (!resultSet.wasApplied()) { throw new HelenusException("Failed to apply batch."); } diff --git a/src/main/java/net/helenus/core/operation/Operation.java b/src/main/java/net/helenus/core/operation/Operation.java index 960a836..eac4cf8 100644 --- a/src/main/java/net/helenus/core/operation/Operation.java +++ b/src/main/java/net/helenus/core/operation/Operation.java @@ -15,9 +15,6 @@ */ package net.helenus.core.operation; -import brave.Span; -import brave.Tracer; -import brave.propagation.TraceContext; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; @@ -43,7 +40,6 @@ public abstract class Operation { protected final AbstractSessionOperations sessionOps; protected boolean showValues; - protected TraceContext traceContext; protected long queryExecutionTimeout = 10; protected TimeUnit queryTimeoutUnits = TimeUnit.SECONDS; protected final Meter uowCacheHits; @@ -96,101 +92,79 @@ public abstract class Operation { public ResultSet execute( AbstractSessionOperations session, UnitOfWork uow, - TraceContext traceContext, long timeout, TimeUnit units, boolean showValues, boolean cached) throws TimeoutException { - // Start recording in a Zipkin sub-span our execution time to perform this operation. - Tracer tracer = session.getZipkinTracer(); - Span span = null; - if (tracer != null && traceContext != null) { - span = tracer.newChild(traceContext); + Statement statement = options(buildStatement(cached)); + + if (session.isShowCql()) { + String stmt = + (this instanceof BatchOperation) + ? queryString((BatchOperation) this, showValues) + : queryString(statement, showValues); + session.getPrintStream().println(stmt); + } else if (LOG.isDebugEnabled()) { + String stmt = + (this instanceof BatchOperation) + ? queryString((BatchOperation) this, showValues) + : queryString(statement, showValues); + LOG.info("CQL> " + stmt); } + Stopwatch timer = Stopwatch.createStarted(); try { - - if (span != null) { - span.name("cassandra"); - span.start(); - } - - Statement statement = options(buildStatement(cached)); - - if (session.isShowCql()) { - String stmt = - (this instanceof BatchOperation) - ? queryString((BatchOperation) this, showValues) - : queryString(statement, showValues); - session.getPrintStream().println(stmt); - } else if (LOG.isDebugEnabled()) { - String stmt = - (this instanceof BatchOperation) - ? queryString((BatchOperation) this, showValues) - : queryString(statement, showValues); - LOG.info("CQL> " + stmt); - } - - Stopwatch timer = Stopwatch.createStarted(); - try { - ResultSetFuture futureResultSet = session.executeAsync(statement, uow, timer); - if (uow != null) uow.recordCacheAndDatabaseOperationCount(0, 1); - ResultSet resultSet = futureResultSet.getUninterruptibly(timeout, units); - ColumnDefinitions columnDefinitions = resultSet.getColumnDefinitions(); - if (LOG.isDebugEnabled()) { - ExecutionInfo ei = resultSet.getExecutionInfo(); - Host qh = ei.getQueriedHost(); - String oh = - ei.getTriedHosts() - .stream() - .map(Host::getAddress) - .map(InetAddress::toString) - .collect(Collectors.joining(", ")); - ConsistencyLevel cl = ei.getAchievedConsistencyLevel(); - if (cl == null) { - cl = statement.getConsistencyLevel(); - } - int se = ei.getSpeculativeExecutions(); - String warn = ei.getWarnings().stream().collect(Collectors.joining(", ")); - String ri = - String.format( - "%s %s ~%s %s %s%s%sspec-retries: %d", - "server v" + qh.getCassandraVersion(), - qh.getAddress().toString(), - (oh != null && !oh.equals("")) ? " [tried: " + oh + "]" : "", - qh.getDatacenter(), - qh.getRack(), - (cl != null) - ? (" consistency: " - + cl.name() - + " " - + (cl.isDCLocal() ? " DC " : "") - + (cl.isSerial() ? " SC " : "")) - : "", - (warn != null && !warn.equals("")) ? ": " + warn : "", - se); - if (uow != null) uow.setInfo(ri); - else LOG.debug(ri); + ResultSetFuture futureResultSet = session.executeAsync(statement, uow, timer); + if (uow != null) uow.recordCacheAndDatabaseOperationCount(0, 1); + ResultSet resultSet = futureResultSet.getUninterruptibly(timeout, units); + ColumnDefinitions columnDefinitions = resultSet.getColumnDefinitions(); + if (LOG.isDebugEnabled()) { + ExecutionInfo ei = resultSet.getExecutionInfo(); + Host qh = ei.getQueriedHost(); + String oh = + ei.getTriedHosts() + .stream() + .map(Host::getAddress) + .map(InetAddress::toString) + .collect(Collectors.joining(", ")); + ConsistencyLevel cl = ei.getAchievedConsistencyLevel(); + if (cl == null) { + cl = statement.getConsistencyLevel(); } - if (!resultSet.wasApplied() - && !(columnDefinitions.size() > 1 || !columnDefinitions.contains("[applied]"))) { - throw new HelenusException("Operation Failed"); - } - return resultSet; - - } finally { - timer.stop(); - if (uow != null) uow.addDatabaseTime("Cassandra", timer); - log(statement, uow, timer, showValues); + int se = ei.getSpeculativeExecutions(); + String warn = ei.getWarnings().stream().collect(Collectors.joining(", ")); + String ri = + String.format( + "%s %s ~%s %s %s%s%sspec-retries: %d", + "server v" + qh.getCassandraVersion(), + qh.getAddress().toString(), + (oh != null && !oh.equals("")) ? " [tried: " + oh + "]" : "", + qh.getDatacenter(), + qh.getRack(), + (cl != null) + ? (" consistency: " + + cl.name() + + " " + + (cl.isDCLocal() ? " DC " : "") + + (cl.isSerial() ? " SC " : "")) + : "", + (warn != null && !warn.equals("")) ? ": " + warn : "", + se); + if (uow != null) uow.setInfo(ri); + else LOG.debug(ri); } + if (!resultSet.wasApplied() + && !(columnDefinitions.size() > 1 || !columnDefinitions.contains("[applied]"))) { + throw new HelenusException("Operation Failed"); + } + return resultSet; } finally { - - if (span != null) { - span.finish(); - } + timer.stop(); + if (uow != null) uow.addDatabaseTime("Cassandra", timer); + log(statement, uow, timer, showValues); } } diff --git a/src/test/java/net/helenus/test/integration/core/simple/SimpleUserTest.java b/src/test/java/net/helenus/test/integration/core/simple/SimpleUserTest.java index 642209b..32ae87f 100644 --- a/src/test/java/net/helenus/test/integration/core/simple/SimpleUserTest.java +++ b/src/test/java/net/helenus/test/integration/core/simple/SimpleUserTest.java @@ -17,13 +17,11 @@ package net.helenus.test.integration.core.simple; import static net.helenus.core.Query.eq; -import com.datastax.driver.core.ResultSet; import java.util.Optional; import java.util.concurrent.TimeoutException; import net.helenus.core.Helenus; import net.helenus.core.HelenusSession; import net.helenus.core.Operator; -import net.helenus.core.operation.UpdateOperation; import net.helenus.support.Fun; import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest; import org.junit.Assert; @@ -184,7 +182,6 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest { .set(user::age, null) .set(user::type, null) .where(user::id, eq(100L)) - .zipkinContext(null) .sync(); Fun.Tuple3 tuple = @@ -217,20 +214,6 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest { } } - public void testZipkin() throws TimeoutException { - session - .update() - .set(user::name, null) - .set(user::age, null) - .set(user::type, null) - .where(user::id, eq(100L)) - .zipkinContext(null) - .sync(); - - UpdateOperation update = session.update(); - update.set(user::name, null).zipkinContext(null).sync(); - } - private void assertUsers(User expected, User actual) { Assert.assertEquals(expected.id(), actual.id()); Assert.assertEquals(expected.name(), actual.name()); diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java index 4872e41..5f9b79a 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java @@ -91,7 +91,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { () -> { q.add("1"); }) - .exceptionally( + .orElse( () -> { q.add("a"); }); @@ -101,7 +101,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { () -> { q.add("2"); }) - .exceptionally( + .orElse( () -> { q.add("b"); }); @@ -110,7 +110,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { () -> { q.add("3"); }) - .exceptionally( + .orElse( () -> { q.add("c"); }); @@ -119,7 +119,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { () -> { q.add("4"); }) - .exceptionally( + .orElse( () -> { q.add("d"); }); @@ -132,7 +132,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { () -> { q.add("5"); }) - .exceptionally( + .orElse( () -> { q.add("e"); });