Remove Zipkin. Revert abstracted UnitOfWork into single class.

This commit is contained in:
Greg Burd 2018-03-28 08:27:49 -04:00
parent 6c245c121e
commit 654f4434bf
17 changed files with 655 additions and 930 deletions

View file

@ -36,9 +36,6 @@
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" /> <orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
<orderEntry type="library" name="Maven: javax.cache:cache-api:1.1.0" level="project" /> <orderEntry type="library" name="Maven: javax.cache:cache-api:1.1.0" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:20.0" level="project" /> <orderEntry type="library" name="Maven: com.google.guava:guava:20.0" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.java:zipkin:1.29.2" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.brave:brave:4.0.6" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.reporter:zipkin-reporter:0.6.12" level="project" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-core:3.2.2" level="project" /> <orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-core:3.2.2" level="project" />
<orderEntry type="library" name="Maven: javax.validation:validation-api:2.0.0.CR3" level="project" /> <orderEntry type="library" name="Maven: javax.validation:validation-api:2.0.0.CR3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.13" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.13" level="project" />

14
pom.xml
View file

@ -154,19 +154,7 @@
<version>20.0</version> <version>20.0</version>
</dependency> </dependency>
<!-- Metrics and tracing --> <!-- Metrics -->
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin</artifactId>
<version>1.29.2</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave</artifactId>
<version>4.0.6</version>
</dependency>
<dependency> <dependency>
<groupId>io.dropwizard.metrics</groupId> <groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId> <artifactId>metrics-core</artifactId>

View file

@ -15,7 +15,6 @@
*/ */
package net.helenus.core; package net.helenus.core;
import brave.Tracer;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*; import com.datastax.driver.core.*;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
@ -110,10 +109,6 @@ public abstract class AbstractSessionOperations {
} }
} }
public Tracer getZipkinTracer() {
return null;
}
public MetricRegistry getMetricRegistry() { public MetricRegistry getMetricRegistry() {
return null; return null;
} }

View file

@ -1,570 +0,0 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core;
import static net.helenus.core.HelenusSession.deleted;
import com.google.common.base.Stopwatch;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.google.common.collect.TreeTraverser;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.core.cache.MapCache;
import net.helenus.core.operation.AbstractOperation;
import net.helenus.core.operation.BatchOperation;
import net.helenus.mapping.MappingUtil;
import net.helenus.support.Either;
import net.helenus.support.HelenusException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public abstract class AbstractUnitOfWork<E extends Exception>
implements UnitOfWork<E>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractUnitOfWork.class);
private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>();
private final HelenusSession session;
public final AbstractUnitOfWork<E> parent;
private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create();
private final MapCache<String, Object> statementCache;
protected String purpose;
protected List<String> nestedPurposes = new ArrayList<String>();
protected String info;
protected int cacheHits = 0;
protected int cacheMisses = 0;
protected int databaseLookups = 0;
protected Stopwatch elapsedTime;
protected Map<String, Double> databaseTime = new HashMap<>();
protected double cacheLookupTimeMSecs = 0.0;
private List<CommitThunk> commitThunks = new ArrayList<CommitThunk>();
private List<CommitThunk> abortThunks = new ArrayList<CommitThunk>();
private List<CompletableFuture<?>> asyncOperationFutures = new ArrayList<CompletableFuture<?>>();
private boolean aborted = false;
private boolean committed = false;
private long committedAt = 0L;
private BatchOperation batch;
protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork<E> parent) {
Objects.requireNonNull(session, "containing session cannot be null");
this.session = session;
this.parent = parent;
CacheLoader cacheLoader = null;
if (parent != null) {
cacheLoader =
new CacheLoader<String, Object>() {
Cache<String, Object> cache = parent.getCache();
@Override
public Object load(String key) throws CacheLoaderException {
return cache.get(key);
}
@Override
public Map<String, Object> loadAll(Iterable<? extends String> keys)
throws CacheLoaderException {
Map<String, Object> kvp = new HashMap<String, Object>();
for (String key : keys) {
kvp.put(key, cache.get(key));
}
return kvp;
}
};
}
this.statementCache =
new MapCache<String, Object>(null, "UOW(" + hashCode() + ")", cacheLoader, true);
}
@Override
public void addDatabaseTime(String name, Stopwatch amount) {
Double time = databaseTime.get(name);
if (time == null) {
databaseTime.put(name, (double) amount.elapsed(TimeUnit.MICROSECONDS));
} else {
databaseTime.put(name, time + amount.elapsed(TimeUnit.MICROSECONDS));
}
}
@Override
public void addCacheLookupTime(Stopwatch amount) {
cacheLookupTimeMSecs += amount.elapsed(TimeUnit.MICROSECONDS);
}
@Override
public void addNestedUnitOfWork(UnitOfWork<E> uow) {
synchronized (nested) {
nested.add((AbstractUnitOfWork<E>) uow);
}
}
@Override
public synchronized UnitOfWork<E> begin() {
elapsedTime = Stopwatch.createStarted();
// log.record(txn::start)
return this;
}
@Override
public String getPurpose() {
return purpose;
}
@Override
public UnitOfWork setPurpose(String purpose) {
this.purpose = purpose;
return this;
}
@Override
public void addFuture(CompletableFuture<?> future) {
asyncOperationFutures.add(future);
}
@Override
public void setInfo(String info) {
this.info = info;
}
@Override
public void recordCacheAndDatabaseOperationCount(int cache, int ops) {
if (cache > 0) {
cacheHits += cache;
} else {
cacheMisses += Math.abs(cache);
}
if (ops > 0) {
databaseLookups += ops;
}
}
public String logTimers(String what) {
double e = (double) elapsedTime.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
double d = 0.0;
double c = cacheLookupTimeMSecs / 1000.0;
double fc = (c / e) * 100.0;
String database = "";
if (databaseTime.size() > 0) {
List<String> dbt = new ArrayList<>(databaseTime.size());
for (Map.Entry<String, Double> dt : databaseTime.entrySet()) {
double t = dt.getValue() / 1000.0;
d += t;
dbt.add(String.format("%s took %,.3fms %,2.2f%%", dt.getKey(), t, (t / e) * 100.0));
}
double fd = (d / e) * 100.0;
database =
String.format(
", %d quer%s (%,.3fms %,2.2f%% - %s)",
databaseLookups, (databaseLookups > 1) ? "ies" : "y", d, fd, String.join(", ", dbt));
}
String cache = "";
if (cacheLookupTimeMSecs > 0) {
int cacheLookups = cacheHits + cacheMisses;
cache =
String.format(
" with %d cache lookup%s (%,.3fms %,2.2f%% - %,d hit, %,d miss)",
cacheLookups, cacheLookups > 1 ? "s" : "", c, fc, cacheHits, cacheMisses);
}
String da = "";
if (databaseTime.size() > 0 || cacheLookupTimeMSecs > 0) {
double dat = d + c;
double daf = (dat / e) * 100;
da =
String.format(
" consuming %,.3fms for data access, or %,2.2f%% of total UOW time.", dat, daf);
}
String x = nestedPurposes.stream().distinct().collect(Collectors.joining(", "));
String n =
nested
.stream()
.map(uow -> String.valueOf(uow.hashCode()))
.collect(Collectors.joining(", "));
String s =
String.format(
Locale.US,
"UOW(%s%s) %s in %,.3fms%s%s%s%s%s%s",
hashCode(),
(nested.size() > 0 ? ", [" + n + "]" : ""),
what,
e,
cache,
database,
da,
(purpose == null ? "" : " " + purpose),
(nestedPurposes.isEmpty()) ? "" : ", " + x,
(info == null) ? "" : " " + info);
return s;
}
private void applyPostCommitFunctions(String what, List<CommitThunk> thunks) {
if (!thunks.isEmpty()) {
for (CommitThunk f : thunks) {
f.apply();
}
}
}
@Override
public Optional<Object> cacheLookup(List<Facet> facets) {
String tableName = CacheUtil.schemaName(facets);
Optional<Object> result = Optional.empty();
for (Facet facet : facets) {
if (!facet.fixed()) {
String columnName = facet.name() + "==" + facet.value();
Either<Object, List<Facet>> eitherValue = cache.get(tableName, columnName);
if (eitherValue != null) {
Object value = deleted;
if (eitherValue.isLeft()) {
value = eitherValue.getLeft();
}
return Optional.of(value);
}
}
}
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
result = checkParentCache(facets);
if (result.isPresent()) {
Object r = result.get();
Class<?> iface = MappingUtil.getMappingInterface(r);
if (Helenus.entity(iface).isDraftable()) {
cacheUpdate(r, facets);
} else {
cacheUpdate(SerializationUtils.<Serializable>clone((Serializable) r), facets);
}
}
return result;
}
private Optional<Object> checkParentCache(List<Facet> facets) {
Optional<Object> result = Optional.empty();
if (parent != null) {
result = parent.checkParentCache(facets);
}
return result;
}
@Override
public List<Facet> cacheEvict(List<Facet> facets) {
Either<Object, List<Facet>> deletedObjectFacets = Either.right(facets);
String tableName = CacheUtil.schemaName(facets);
Optional<Object> optionalValue = cacheLookup(facets);
for (Facet facet : facets) {
if (!facet.fixed()) {
String columnKey = facet.name() + "==" + facet.value();
// mark the value identified by the facet to `deleted`
cache.put(tableName, columnKey, deletedObjectFacets);
}
}
// Now, look for other row/col pairs that referenced the same object, mark them
// `deleted` if the cache had a value before we added the deleted marker objects.
if (optionalValue.isPresent()) {
Object value = optionalValue.get();
cache
.columnKeySet()
.forEach(
columnKey -> {
Either<Object, List<Facet>> eitherCachedValue = cache.get(tableName, columnKey);
if (eitherCachedValue.isLeft()) {
Object cachedValue = eitherCachedValue.getLeft();
if (cachedValue == value) {
cache.put(tableName, columnKey, deletedObjectFacets);
String[] parts = columnKey.split("==");
facets.add(new Facet<String>(parts[0], parts[1]));
}
}
});
}
return facets;
}
@Override
public Cache<String, Object> getCache() {
return statementCache;
}
@Override
public Object cacheUpdate(Object value, List<Facet> facets) {
Object result = null;
String tableName = CacheUtil.schemaName(facets);
for (Facet facet : facets) {
if (!facet.fixed()) {
if (facet.alone()) {
String columnName = facet.name() + "==" + facet.value();
if (result == null) result = cache.get(tableName, columnName);
cache.put(tableName, columnName, Either.left(value));
}
}
}
return result;
}
public void batch(AbstractOperation s) {
if (batch == null) {
batch = new BatchOperation(session);
}
batch.add(s);
}
private Iterator<AbstractUnitOfWork<E>> getChildNodes() {
return nested.iterator();
}
/**
* Checks to see if the work performed between calling begin and now can be committed or not.
*
* @return a function from which to chain work that only happens when commit is successful
* @throws E when the work overlaps with other concurrent writers.
*/
public synchronized PostCommitFunction<Void, Void> commit() throws E, TimeoutException {
if (isDone()) {
return new PostCommitFunction(this, null, null, false);
}
// Only the outer-most UOW batches statements for commit time, execute them.
if (batch != null) {
committedAt = batch.sync(this); //TODO(gburd): update cache with writeTime...
}
// All nested UnitOfWork should be committed (not aborted) before calls to
// commit, check.
boolean canCommit = true;
TreeTraverser<AbstractUnitOfWork<E>> traverser =
TreeTraverser.using(node -> node::getChildNodes);
for (AbstractUnitOfWork<E> uow : traverser.postOrderTraversal(this)) {
if (this != uow) {
canCommit &= (!uow.aborted && uow.committed);
}
}
if (!canCommit) {
if (parent == null) {
// Apply all post-commit abort functions, this is the outer-most UnitOfWork.
traverser
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("aborted", abortThunks);
});
elapsedTime.stop();
if (LOG.isInfoEnabled()) {
LOG.info(logTimers("aborted"));
}
}
return new PostCommitFunction(this, null, null, false);
} else {
committed = true;
aborted = false;
if (parent == null) {
// Apply all post-commit commit functions, this is the outer-most UnitOfWork.
traverser
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("committed", uow.commitThunks);
});
// Merge our statement cache into the session cache if it exists.
CacheManager cacheManager = session.getCacheManager();
if (cacheManager != null) {
for (Map.Entry<String, Object> entry :
(Set<Map.Entry<String, Object>>) statementCache.<Map>unwrap(Map.class).entrySet()) {
String[] keyParts = entry.getKey().split("\\.");
if (keyParts.length == 2) {
String cacheName = keyParts[0];
String key = keyParts[1];
if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) {
Cache<Object, Object> cache = cacheManager.getCache(cacheName);
if (cache != null) {
Object value = entry.getValue();
if (value == deleted) {
cache.remove(key);
} else {
cache.put(key.toString(), value);
}
}
}
}
}
}
// Merge our cache into the session cache.
session.mergeCache(cache);
// Spoil any lingering futures that may be out there.
asyncOperationFutures.forEach(
f ->
f.completeExceptionally(
new HelenusException(
"Futures must be resolved before their unit of work has committed/aborted.")));
elapsedTime.stop();
if (LOG.isInfoEnabled()) {
LOG.info(logTimers("committed"));
}
return new PostCommitFunction(this, null, null, true);
} else {
// Merge cache and statistics into parent if there is one.
parent.statementCache.putAll(statementCache.<Map>unwrap(Map.class));
parent.mergeCache(cache);
parent.addBatched(batch);
if (purpose != null) {
parent.nestedPurposes.add(purpose);
}
parent.cacheHits += cacheHits;
parent.cacheMisses += cacheMisses;
parent.databaseLookups += databaseLookups;
parent.cacheLookupTimeMSecs += cacheLookupTimeMSecs;
for (Map.Entry<String, Double> dt : databaseTime.entrySet()) {
String name = dt.getKey();
if (parent.databaseTime.containsKey(name)) {
double t = parent.databaseTime.get(name);
parent.databaseTime.put(name, t + dt.getValue());
} else {
parent.databaseTime.put(name, dt.getValue());
}
}
}
}
// TODO(gburd): hopefully we'll be able to detect conflicts here and so we'd want to...
// else {
// Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass);
// T object = ctor.newInstance(new Object[] { String message });
// }
return new PostCommitFunction(this, commitThunks, abortThunks, true);
}
private void addBatched(BatchOperation batch) {
if (batch != null) {
if (this.batch == null) {
this.batch = batch;
} else {
this.batch.addAll(batch);
}
}
}
/* Explicitly discard the work and mark it as as such in the log. */
public synchronized void abort() {
if (!aborted) {
aborted = true;
// Spoil any pending futures created within the context of this unit of work.
asyncOperationFutures.forEach(
f ->
f.completeExceptionally(
new HelenusException(
"Futures must be resolved before their unit of work has committed/aborted.")));
TreeTraverser<AbstractUnitOfWork<E>> traverser =
TreeTraverser.using(node -> node::getChildNodes);
traverser
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("aborted", uow.abortThunks);
uow.abortThunks.clear();
});
if (parent == null) {
elapsedTime.stop();
if (LOG.isInfoEnabled()) {
LOG.info(logTimers("aborted"));
}
}
// TODO(gburd): when we integrate the transaction support we'll need to...
// log.record(txn::abort)
// cache.invalidateSince(txn::start time)
}
}
private void mergeCache(Table<String, String, Either<Object, List<Facet>>> from) {
Table<String, String, Either<Object, List<Facet>>> to = this.cache;
from.rowMap()
.forEach(
(rowKey, columnMap) -> {
columnMap.forEach(
(columnKey, value) -> {
if (to.contains(rowKey, columnKey)) {
to.put(
rowKey,
columnKey,
Either.left(
CacheUtil.merge(
to.get(rowKey, columnKey).getLeft(),
from.get(rowKey, columnKey).getLeft())));
} else {
to.put(rowKey, columnKey, from.get(rowKey, columnKey));
}
});
});
}
public boolean isDone() {
return aborted || committed;
}
public String describeConflicts() {
return "it's complex...";
}
@Override
public void close() throws E {
// Closing a AbstractUnitOfWork will abort iff we've not already aborted or committed this unit of work.
if (aborted == false && committed == false) {
abort();
}
}
public boolean hasAborted() {
return aborted;
}
public boolean hasCommitted() {
return committed;
}
public long committedAt() {
return committedAt;
}
}

View file

@ -17,19 +17,14 @@ package net.helenus.core;
import static net.helenus.core.Query.eq; import static net.helenus.core.Query.eq;
import brave.Tracer;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*; import com.datastax.driver.core.*;
import com.google.common.collect.Table; import com.google.common.collect.Table;
import java.io.Closeable; import java.io.Closeable;
import java.io.PrintStream; import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.*; import java.util.*;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.function.Function; import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.cache.Cache; import javax.cache.Cache;
import javax.cache.CacheManager; import javax.cache.CacheManager;
@ -48,24 +43,16 @@ import net.helenus.support.*;
import net.helenus.support.Fun.Tuple1; import net.helenus.support.Fun.Tuple1;
import net.helenus.support.Fun.Tuple2; import net.helenus.support.Fun.Tuple2;
import net.helenus.support.Fun.Tuple6; import net.helenus.support.Fun.Tuple6;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelenusSession extends AbstractSessionOperations implements Closeable { public class HelenusSession extends AbstractSessionOperations implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(HelenusSession.class);
public static final Object deleted = new Object(); public static final Object deleted = new Object();
private static final Pattern classNameRegex =
Pattern.compile("^(?:\\w+\\.)+(?:(\\w+)|(\\w+)\\$.*)$");
private final Session session; private final Session session;
private final CodecRegistry registry; private final CodecRegistry registry;
private final ConsistencyLevel defaultConsistencyLevel; private final ConsistencyLevel defaultConsistencyLevel;
private final boolean defaultQueryIdempotency; private final boolean defaultQueryIdempotency;
private final MetricRegistry metricRegistry; private final MetricRegistry metricRegistry;
private final Tracer zipkinTracer;
private final PrintStream printStream; private final PrintStream printStream;
private final Class<? extends UnitOfWork> unitOfWorkClass;
private final SessionRepository sessionRepository; private final SessionRepository sessionRepository;
private final Executor executor; private final Executor executor;
private final boolean dropSchemaOnClose; private final boolean dropSchemaOnClose;
@ -89,10 +76,8 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
boolean dropSchemaOnClose, boolean dropSchemaOnClose,
ConsistencyLevel consistencyLevel, ConsistencyLevel consistencyLevel,
boolean defaultQueryIdempotency, boolean defaultQueryIdempotency,
Class<? extends UnitOfWork> unitOfWorkClass,
CacheManager cacheManager, CacheManager cacheManager,
MetricRegistry metricRegistry, MetricRegistry metricRegistry) {
Tracer tracer) {
this.session = session; this.session = session;
this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry; this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry;
this.usingKeyspace = this.usingKeyspace =
@ -107,9 +92,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
this.dropSchemaOnClose = dropSchemaOnClose; this.dropSchemaOnClose = dropSchemaOnClose;
this.defaultConsistencyLevel = consistencyLevel; this.defaultConsistencyLevel = consistencyLevel;
this.defaultQueryIdempotency = defaultQueryIdempotency; this.defaultQueryIdempotency = defaultQueryIdempotency;
this.unitOfWorkClass = unitOfWorkClass;
this.metricRegistry = metricRegistry; this.metricRegistry = metricRegistry;
this.zipkinTracer = tracer;
this.cacheManager = cacheManager; this.cacheManager = cacheManager;
this.valueProvider = new RowColumnValueProvider(this.sessionRepository); this.valueProvider = new RowColumnValueProvider(this.sessionRepository);
@ -117,6 +100,14 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
this.metadata = session == null ? null : session.getCluster().getMetadata(); this.metadata = session == null ? null : session.getCluster().getMetadata();
} }
public UnitOfWork begin() {
return new UnitOfWork(this).begin();
}
public UnitOfWork begin(UnitOfWork parent) {
return new UnitOfWork(this, parent).begin();
}
@Override @Override
public Session currentSession() { public Session currentSession() {
return session; return session;
@ -187,11 +178,6 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
return valuePreparer; return valuePreparer;
} }
@Override
public Tracer getZipkinTracer() {
return zipkinTracer;
}
@Override @Override
public MetricRegistry getMetricRegistry() { public MetricRegistry getMetricRegistry() {
return metricRegistry; return metricRegistry;
@ -360,70 +346,6 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
return metadata; return metadata;
} }
public UnitOfWork begin() {
return this.begin(null);
}
private String extractClassNameFromStackFrame(String classNameOnStack) {
String name = null;
Matcher m = classNameRegex.matcher(classNameOnStack);
if (m.find()) {
name = (m.group(1) != null) ? m.group(1) : ((m.group(2) != null) ? m.group(2) : name);
} else {
name = classNameOnStack;
}
return name;
}
public synchronized UnitOfWork begin(UnitOfWork parent) {
try {
Class<? extends UnitOfWork> clazz = unitOfWorkClass;
Constructor<? extends UnitOfWork> ctor =
clazz.getConstructor(HelenusSession.class, UnitOfWork.class);
UnitOfWork uow = ctor.newInstance(this, parent);
if (LOG.isInfoEnabled() && uow.getPurpose() == null) {
StringBuilder purpose = null;
int frame = 0;
StackTraceElement[] trace = Thread.currentThread().getStackTrace();
String targetClassName = HelenusSession.class.getSimpleName();
String stackClassName = null;
do {
frame++;
stackClassName = extractClassNameFromStackFrame(trace[frame].getClassName());
} while (!stackClassName.equals(targetClassName) && frame < trace.length);
do {
frame++;
stackClassName = extractClassNameFromStackFrame(trace[frame].getClassName());
} while (stackClassName.equals(targetClassName) && frame < trace.length);
if (frame < trace.length) {
purpose =
new StringBuilder()
.append(trace[frame].getClassName())
.append(".")
.append(trace[frame].getMethodName())
.append("(")
.append(trace[frame].getFileName())
.append(":")
.append(trace[frame].getLineNumber())
.append(")");
uow.setPurpose(purpose.toString());
}
}
if (parent != null) {
parent.addNestedUnitOfWork(uow);
}
return uow.begin();
} catch (NoSuchMethodException
| InvocationTargetException
| InstantiationException
| IllegalAccessException e) {
throw new HelenusException(
String.format(
"Unable to instantiate %s as a UnitOfWork.", unitOfWorkClass.getSimpleName()),
e);
}
}
public <E> SelectOperation<E> select(E pojo) { public <E> SelectOperation<E> select(E pojo) {
Objects.requireNonNull( Objects.requireNonNull(
pojo, "supplied object must be a dsl for a registered entity but cannot be null"); pojo, "supplied object must be a dsl for a registered entity but cannot be null");

View file

@ -33,7 +33,7 @@ public class PostCommitFunction<T, R> implements java.util.function.Function<T,
return this; return this;
} }
public PostCommitFunction<T, R> exceptionally(CommitThunk after) { public PostCommitFunction<T, R> orElse(CommitThunk after) {
Objects.requireNonNull(after); Objects.requireNonNull(after);
if (abortThunks == null) { if (abortThunks == null) {
if (!committed) { if (!committed) {

View file

@ -15,7 +15,6 @@
*/ */
package net.helenus.core; package net.helenus.core;
import brave.Tracer;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*; import com.datastax.driver.core.*;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -47,10 +46,8 @@ public final class SessionInitializer extends AbstractSessionOperations {
private ConsistencyLevel consistencyLevel; private ConsistencyLevel consistencyLevel;
private boolean idempotent = false; private boolean idempotent = false;
private MetricRegistry metricRegistry = new MetricRegistry(); private MetricRegistry metricRegistry = new MetricRegistry();
private Tracer zipkinTracer;
private PrintStream printStream = System.out; private PrintStream printStream = System.out;
private Executor executor = MoreExecutors.directExecutor(); private Executor executor = MoreExecutors.directExecutor();
private Class<? extends UnitOfWork> unitOfWorkClass = UnitOfWorkImpl.class;
private SessionRepositoryBuilder sessionRepository; private SessionRepositoryBuilder sessionRepository;
private boolean dropUnusedColumns = false; private boolean dropUnusedColumns = false;
private boolean dropUnusedIndexes = false; private boolean dropUnusedIndexes = false;
@ -131,16 +128,6 @@ public final class SessionInitializer extends AbstractSessionOperations {
return this; return this;
} }
public SessionInitializer zipkinTracer(Tracer tracer) {
this.zipkinTracer = tracer;
return this;
}
public SessionInitializer setUnitOfWorkClass(Class<? extends UnitOfWork> e) {
this.unitOfWorkClass = e;
return this;
}
public SessionInitializer consistencyLevel(ConsistencyLevel consistencyLevel) { public SessionInitializer consistencyLevel(ConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel; this.consistencyLevel = consistencyLevel;
return this; return this;
@ -292,10 +279,8 @@ public final class SessionInitializer extends AbstractSessionOperations {
autoDdl == AutoDdl.CREATE_DROP, autoDdl == AutoDdl.CREATE_DROP,
consistencyLevel, consistencyLevel,
idempotent, idempotent,
unitOfWorkClass,
cacheManager, cacheManager,
metricRegistry, metricRegistry);
zipkinTracer);
} }
private void initialize() { private void initialize() {

View file

@ -15,16 +15,164 @@
*/ */
package net.helenus.core; package net.helenus.core;
import com.google.common.base.Stopwatch; import static net.helenus.core.HelenusSession.deleted;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import javax.cache.Cache;
import net.helenus.core.cache.Facet;
import net.helenus.core.operation.AbstractOperation;
public interface UnitOfWork<X extends Exception> extends AutoCloseable { import com.google.common.base.Stopwatch;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.google.common.collect.TreeTraverser;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.core.cache.MapCache;
import net.helenus.core.operation.AbstractOperation;
import net.helenus.core.operation.BatchOperation;
import net.helenus.mapping.MappingUtil;
import net.helenus.support.Either;
import net.helenus.support.HelenusException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public class UnitOfWork implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(UnitOfWork.class);
private static final Pattern classNameRegex =
Pattern.compile("^(?:\\w+\\.)+(?:(\\w+)|(\\w+)\\$.*)$");
private final List<UnitOfWork> nested = new ArrayList<>();
private final HelenusSession session;
public final UnitOfWork parent;
private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create();
private final MapCache<String, Object> statementCache;
protected String purpose;
protected List<String> nestedPurposes = new ArrayList<String>();
protected String info;
protected int cacheHits = 0;
protected int cacheMisses = 0;
protected int databaseLookups = 0;
protected final Stopwatch elapsedTime;
protected Map<String, Double> databaseTime = new HashMap<>();
protected double cacheLookupTimeMSecs = 0.0;
private List<CommitThunk> commitThunks = new ArrayList<CommitThunk>();
private List<CommitThunk> abortThunks = new ArrayList<CommitThunk>();
private List<CompletableFuture<?>> asyncOperationFutures = new ArrayList<CompletableFuture<?>>();
private boolean aborted = false;
private boolean committed = false;
private long committedAt = 0L;
private BatchOperation batch;
private String extractClassNameFromStackFrame(String classNameOnStack) {
String name = null;
Matcher m = classNameRegex.matcher(classNameOnStack);
if (m.find()) {
name = (m.group(1) != null) ? m.group(1) : ((m.group(2) != null) ? m.group(2) : name);
} else {
name = classNameOnStack;
}
return name;
}
public UnitOfWork(HelenusSession session) {
this(session, null);
}
public UnitOfWork(HelenusSession session, UnitOfWork parent) {
Objects.requireNonNull(session, "containing session cannot be null");
this.parent = parent;
if (parent != null) {
parent.addNestedUnitOfWork(this);
}
this.session = session;
CacheLoader cacheLoader = null;
if (parent != null) {
cacheLoader =
new CacheLoader<String, Object>() {
Cache<String, Object> cache = parent.getCache();
@Override
public Object load(String key) throws CacheLoaderException {
return cache.get(key);
}
@Override
public Map<String, Object> loadAll(Iterable<? extends String> keys)
throws CacheLoaderException {
Map<String, Object> kvp = new HashMap<String, Object>();
for (String key : keys) {
kvp.put(key, cache.get(key));
}
return kvp;
}
};
}
this.elapsedTime = Stopwatch.createUnstarted();
this.statementCache =
new MapCache<String, Object>(null, "UOW(" + hashCode() + ")", cacheLoader, true);
if (LOG.isInfoEnabled()) {
StringBuilder purpose = null;
int frame = 0;
StackTraceElement[] trace = Thread.currentThread().getStackTrace();
String targetClassName = HelenusSession.class.getSimpleName();
String stackClassName = null;
do {
frame++;
stackClassName = extractClassNameFromStackFrame(trace[frame].getClassName());
} while (!stackClassName.equals(targetClassName) && frame < trace.length);
do {
frame++;
stackClassName = extractClassNameFromStackFrame(trace[frame].getClassName());
} while (stackClassName.equals(targetClassName) && frame < trace.length);
if (frame < trace.length) {
purpose =
new StringBuilder()
.append(trace[frame].getClassName())
.append(".")
.append(trace[frame].getMethodName())
.append("(")
.append(trace[frame].getFileName())
.append(":")
.append(trace[frame].getLineNumber())
.append(")");
this.purpose = purpose.toString();
}
}
}
public void addDatabaseTime(String name, Stopwatch amount) {
Double time = databaseTime.get(name);
if (time == null) {
databaseTime.put(name, (double) amount.elapsed(TimeUnit.MICROSECONDS));
} else {
databaseTime.put(name, time + amount.elapsed(TimeUnit.MICROSECONDS));
}
}
public void addCacheLookupTime(Stopwatch amount) {
cacheLookupTimeMSecs += amount.elapsed(TimeUnit.MICROSECONDS);
}
public void addNestedUnitOfWork(UnitOfWork uow) {
synchronized (nested) {
nested.add(uow);
}
}
/** /**
* Marks the beginning of a transactional section of work. Will write a * Marks the beginning of a transactional section of work. Will write a
@ -32,54 +180,437 @@ public interface UnitOfWork<X extends Exception> extends AutoCloseable {
* *
* @return the handle used to commit or abort the work. * @return the handle used to commit or abort the work.
*/ */
UnitOfWork<X> begin(); public synchronized UnitOfWork begin() {
elapsedTime.start();
// log.record(txn::start)
return this;
}
void addNestedUnitOfWork(UnitOfWork<X> uow); public String getPurpose() {
return purpose;
}
public UnitOfWork setPurpose(String purpose) {
this.purpose = purpose;
return this;
}
public void addFuture(CompletableFuture<?> future) {
asyncOperationFutures.add(future);
}
public void setInfo(String info) {
this.info = info;
}
public void recordCacheAndDatabaseOperationCount(int cache, int ops) {
if (cache > 0) {
cacheHits += cache;
} else {
cacheMisses += Math.abs(cache);
}
if (ops > 0) {
databaseLookups += ops;
}
}
public String logTimers(String what) {
double e = (double) elapsedTime.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
double d = 0.0;
double c = cacheLookupTimeMSecs / 1000.0;
double fc = (c / e) * 100.0;
String database = "";
if (databaseTime.size() > 0) {
List<String> dbt = new ArrayList<>(databaseTime.size());
for (Map.Entry<String, Double> dt : databaseTime.entrySet()) {
double t = dt.getValue() / 1000.0;
d += t;
dbt.add(String.format("%s took %,.3fms %,2.2f%%", dt.getKey(), t, (t / e) * 100.0));
}
double fd = (d / e) * 100.0;
database =
String.format(
", %d quer%s (%,.3fms %,2.2f%% - %s)",
databaseLookups, (databaseLookups > 1) ? "ies" : "y", d, fd, String.join(", ", dbt));
}
String cache = "";
if (cacheLookupTimeMSecs > 0) {
int cacheLookups = cacheHits + cacheMisses;
cache =
String.format(
" with %d cache lookup%s (%,.3fms %,2.2f%% - %,d hit, %,d miss)",
cacheLookups, cacheLookups > 1 ? "s" : "", c, fc, cacheHits, cacheMisses);
}
String da = "";
if (databaseTime.size() > 0 || cacheLookupTimeMSecs > 0) {
double dat = d + c;
double daf = (dat / e) * 100;
da =
String.format(
" consuming %,.3fms for data access, or %,2.2f%% of total UOW time.", dat, daf);
}
String x = nestedPurposes.stream().distinct().collect(Collectors.joining(", "));
String n =
nested
.stream()
.map(uow -> String.valueOf(uow.hashCode()))
.collect(Collectors.joining(", "));
String s =
String.format(
Locale.US,
"UOW(%s%s) %s in %,.3fms%s%s%s%s%s%s",
hashCode(),
(nested.size() > 0 ? ", [" + n + "]" : ""),
what,
e,
cache,
database,
da,
(purpose == null ? "" : " " + purpose),
(nestedPurposes.isEmpty()) ? "" : ", " + x,
(info == null) ? "" : " " + info);
return s;
}
private void applyPostCommitFunctions(String what, List<CommitThunk> thunks) {
if (!thunks.isEmpty()) {
for (CommitThunk f : thunks) {
f.apply();
}
}
}
public Optional<Object> cacheLookup(List<Facet> facets) {
String tableName = CacheUtil.schemaName(facets);
Optional<Object> result = Optional.empty();
for (Facet facet : facets) {
if (!facet.fixed()) {
String columnName = facet.name() + "==" + facet.value();
Either<Object, List<Facet>> eitherValue = cache.get(tableName, columnName);
if (eitherValue != null) {
Object value = deleted;
if (eitherValue.isLeft()) {
value = eitherValue.getLeft();
}
return Optional.of(value);
}
}
}
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
result = checkParentCache(facets);
if (result.isPresent()) {
Object r = result.get();
Class<?> iface = MappingUtil.getMappingInterface(r);
if (Helenus.entity(iface).isDraftable()) {
cacheUpdate(r, facets);
} else {
cacheUpdate(SerializationUtils.<Serializable>clone((Serializable) r), facets);
}
}
return result;
}
private Optional<Object> checkParentCache(List<Facet> facets) {
Optional<Object> result = Optional.empty();
if (parent != null) {
result = parent.checkParentCache(facets);
}
return result;
}
public List<Facet> cacheEvict(List<Facet> facets) {
Either<Object, List<Facet>> deletedObjectFacets = Either.right(facets);
String tableName = CacheUtil.schemaName(facets);
Optional<Object> optionalValue = cacheLookup(facets);
for (Facet facet : facets) {
if (!facet.fixed()) {
String columnKey = facet.name() + "==" + facet.value();
// mark the value identified by the facet to `deleted`
cache.put(tableName, columnKey, deletedObjectFacets);
}
}
// Now, look for other row/col pairs that referenced the same object, mark them
// `deleted` if the cache had a value before we added the deleted marker objects.
if (optionalValue.isPresent()) {
Object value = optionalValue.get();
cache
.columnKeySet()
.forEach(
columnKey -> {
Either<Object, List<Facet>> eitherCachedValue = cache.get(tableName, columnKey);
if (eitherCachedValue.isLeft()) {
Object cachedValue = eitherCachedValue.getLeft();
if (cachedValue == value) {
cache.put(tableName, columnKey, deletedObjectFacets);
String[] parts = columnKey.split("==");
facets.add(new Facet<String>(parts[0], parts[1]));
}
}
});
}
return facets;
}
public Cache<String, Object> getCache() {
return statementCache;
}
public Object cacheUpdate(Object value, List<Facet> facets) {
Object result = null;
String tableName = CacheUtil.schemaName(facets);
for (Facet facet : facets) {
if (!facet.fixed()) {
if (facet.alone()) {
String columnName = facet.name() + "==" + facet.value();
if (result == null) result = cache.get(tableName, columnName);
cache.put(tableName, columnName, Either.left(value));
}
}
}
return result;
}
public void batch(AbstractOperation s) {
if (batch == null) {
batch = new BatchOperation(session);
}
batch.add(s);
}
private Iterator<UnitOfWork> getChildNodes() {
return nested.iterator();
}
/** /**
* Checks to see if the work performed between calling begin and now can be committed or not. * Checks to see if the work performed between calling begin and now can be committed or not.
* *
* @return a function from which to chain work that only happens when commit is successful * @return a function from which to chain work that only happens when commit is successful
* @throws X when the work overlaps with other concurrent writers. * @throws HelenusException when the work overlaps with other concurrent writers.
*/ */
PostCommitFunction<Void, Void> commit() throws X, TimeoutException; public synchronized PostCommitFunction<Void, Void> commit()
throws HelenusException, TimeoutException {
if (isDone()) {
return new PostCommitFunction(this, null, null, false);
}
// Only the outer-most UOW batches statements for commit time, execute them.
if (batch != null) {
committedAt = batch.sync(this); //TODO(gburd): update cache with writeTime...
}
// All nested UnitOfWork should be committed (not aborted) before calls to
// commit, check.
boolean canCommit = true;
TreeTraverser<UnitOfWork> traverser = TreeTraverser.using(node -> node::getChildNodes);
for (UnitOfWork uow : traverser.postOrderTraversal(this)) {
if (this != uow) {
canCommit &= (!uow.aborted && uow.committed);
}
}
if (!canCommit) {
if (parent == null) {
// Apply all post-commit abort functions, this is the outer-most UnitOfWork.
traverser
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("aborted", abortThunks);
});
elapsedTime.stop();
if (LOG.isInfoEnabled()) {
LOG.info(logTimers("aborted"));
}
}
return new PostCommitFunction(this, null, null, false);
} else {
committed = true;
aborted = false;
if (parent == null) {
// Apply all post-commit commit functions, this is the outer-most UnitOfWork.
traverser
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("committed", uow.commitThunks);
});
// Merge our statement cache into the session cache if it exists.
CacheManager cacheManager = session.getCacheManager();
if (cacheManager != null) {
for (Map.Entry<String, Object> entry :
(Set<Map.Entry<String, Object>>) statementCache.<Map>unwrap(Map.class).entrySet()) {
String[] keyParts = entry.getKey().split("\\.");
if (keyParts.length == 2) {
String cacheName = keyParts[0];
String key = keyParts[1];
if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) {
Cache<Object, Object> cache = cacheManager.getCache(cacheName);
if (cache != null) {
Object value = entry.getValue();
if (value == deleted) {
cache.remove(key);
} else {
cache.put(key.toString(), value);
}
}
}
}
}
}
// Merge our cache into the session cache.
session.mergeCache(cache);
// Spoil any lingering futures that may be out there.
asyncOperationFutures.forEach(
f ->
f.completeExceptionally(
new HelenusException(
"Futures must be resolved before their unit of work has committed/aborted.")));
elapsedTime.stop();
if (LOG.isInfoEnabled()) {
LOG.info(logTimers("committed"));
}
return new PostCommitFunction(this, null, null, true);
} else {
// Merge cache and statistics into parent if there is one.
parent.statementCache.putAll(statementCache.<Map>unwrap(Map.class));
parent.mergeCache(cache);
parent.addBatched(batch);
if (purpose != null) {
parent.nestedPurposes.add(purpose);
}
parent.cacheHits += cacheHits;
parent.cacheMisses += cacheMisses;
parent.databaseLookups += databaseLookups;
parent.cacheLookupTimeMSecs += cacheLookupTimeMSecs;
for (Map.Entry<String, Double> dt : databaseTime.entrySet()) {
String name = dt.getKey();
if (parent.databaseTime.containsKey(name)) {
double t = parent.databaseTime.get(name);
parent.databaseTime.put(name, t + dt.getValue());
} else {
parent.databaseTime.put(name, dt.getValue());
}
}
}
}
// TODO(gburd): hopefully we'll be able to detect conflicts here and so we'd want to...
// else {
// Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass);
// T object = ctor.newInstance(new Object[] { String message });
// }
return new PostCommitFunction(this, commitThunks, abortThunks, true);
}
private void addBatched(BatchOperation batch) {
if (batch != null) {
if (this.batch == null) {
this.batch = batch;
} else {
this.batch.addAll(batch);
}
}
}
/** /**
* Explicitly abort the work within this unit of work. Any nested aborted unit of work will * Explicitly abort the work within this unit of work. Any nested aborted unit of work will
* trigger the entire unit of work to commit. * trigger the entire unit of work to commit.
*/ */
void abort(); public synchronized void abort() {
if (!aborted) {
aborted = true;
boolean hasAborted(); // Spoil any pending futures created within the context of this unit of work.
asyncOperationFutures.forEach(
f ->
f.completeExceptionally(
new HelenusException(
"Futures must be resolved before their unit of work has committed/aborted.")));
boolean hasCommitted(); TreeTraverser<UnitOfWork> traverser = TreeTraverser.using(node -> node::getChildNodes);
traverser
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("aborted", uow.abortThunks);
uow.abortThunks.clear();
});
boolean isDone(); if (parent == null) {
elapsedTime.stop();
long committedAt(); if (LOG.isInfoEnabled()) {
LOG.info(logTimers("aborted"));
void batch(AbstractOperation operation); }
}
void addFuture(CompletableFuture<?> future);
// TODO(gburd): when we integrate the transaction support we'll need to...
Optional<Object> cacheLookup(List<Facet> facets); // log.record(txn::abort)
// cache.invalidateSince(txn::start time)
Cache<String, Object> getCache(); }
}
Object cacheUpdate(Object pojo, List<Facet> facets);
private void mergeCache(Table<String, String, Either<Object, List<Facet>>> from) {
List<Facet> cacheEvict(List<Facet> facets); Table<String, String, Either<Object, List<Facet>>> to = this.cache;
from.rowMap()
String getPurpose(); .forEach(
(rowKey, columnMap) -> {
UnitOfWork setPurpose(String purpose); columnMap.forEach(
(columnKey, value) -> {
void setInfo(String info); if (to.contains(rowKey, columnKey)) {
to.put(
void addDatabaseTime(String name, Stopwatch amount); rowKey,
columnKey,
void addCacheLookupTime(Stopwatch amount); Either.left(
CacheUtil.merge(
// Cache > 0 means "cache hit", < 0 means cache miss. to.get(rowKey, columnKey).getLeft(),
void recordCacheAndDatabaseOperationCount(int cache, int database); from.get(rowKey, columnKey).getLeft())));
} else {
to.put(rowKey, columnKey, from.get(rowKey, columnKey));
}
});
});
}
public boolean isDone() {
return aborted || committed;
}
public String describeConflicts() {
return "it's complex...";
}
@Override
public void close() throws HelenusException {
// Closing a UnitOfWork will abort iff we've not already aborted or committed this unit of work.
if (aborted == false && committed == false) {
abort();
}
}
public boolean hasAborted() {
return aborted;
}
public boolean hasCommitted() {
return committed;
}
public long committedAt() {
return committedAt;
}
} }

View file

@ -1,26 +0,0 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core;
import net.helenus.support.HelenusException;
class UnitOfWorkImpl extends AbstractUnitOfWork<HelenusException> {
@SuppressWarnings("unchecked")
public UnitOfWorkImpl(HelenusSession session, UnitOfWork parent) {
super(session, (AbstractUnitOfWork<HelenusException>) parent);
}
}

View file

@ -41,13 +41,7 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
try { try {
ResultSet resultSet = ResultSet resultSet =
this.execute( this.execute(
sessionOps, sessionOps, null, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
null,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
false);
return transform(resultSet); return transform(resultSet);
} finally { } finally {
context.stop(); context.stop();
@ -60,14 +54,7 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
final Timer.Context context = requestLatency.time(); final Timer.Context context = requestLatency.time();
try { try {
ResultSet resultSet = ResultSet resultSet =
execute( execute(sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, true);
sessionOps,
uow,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
true);
E result = transform(resultSet); E result = transform(resultSet);
return result; return result;
} finally { } finally {

View file

@ -98,7 +98,6 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
this.execute( this.execute(
sessionOps, sessionOps,
null, null,
traceContext,
queryExecutionTimeout, queryExecutionTimeout,
queryTimeoutUnits, queryTimeoutUnits,
showValues, showValues,
@ -125,7 +124,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
} }
} }
public Optional<E> sync(UnitOfWork<?> uow) throws TimeoutException { public Optional<E> sync(UnitOfWork uow) throws TimeoutException {
if (uow == null) return sync(); if (uow == null) return sync();
final Timer.Context context = requestLatency.time(); final Timer.Context context = requestLatency.time();
@ -206,14 +205,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
// Formulate the query and execute it against the Cassandra cluster. // Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = ResultSet resultSet =
execute( execute(sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, true);
sessionOps,
uow,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
true);
// Transform the query result set into the desired shape. // Transform the query result set into the desired shape.
result = transform(resultSet); result = transform(resultSet);
@ -245,7 +237,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
}); });
} }
public CompletableFuture<Optional<E>> async(UnitOfWork<?> uow) { public CompletableFuture<Optional<E>> async(UnitOfWork uow) {
if (uow == null) return async(); if (uow == null) return async();
CompletableFuture<Optional<E>> f = CompletableFuture<Optional<E>> f =
CompletableFuture.<Optional<E>>supplyAsync( CompletableFuture.<Optional<E>>supplyAsync(

View file

@ -15,8 +15,6 @@
*/ */
package net.helenus.core.operation; package net.helenus.core.operation;
import brave.Tracer;
import brave.propagation.TraceContext;
import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement; import com.datastax.driver.core.RegularStatement;
@ -254,17 +252,6 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
return statement; return statement;
} }
public O zipkinContext(TraceContext traceContext) {
if (traceContext != null) {
Tracer tracer = this.sessionOps.getZipkinTracer();
if (tracer != null) {
this.traceContext = traceContext;
}
}
return (O) this;
}
@Override @Override
protected boolean isIdempotentOperation() { protected boolean isIdempotentOperation() {
return idempotent; return idempotent;
@ -317,7 +304,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
return ignoreCache; return ignoreCache;
} }
protected E checkCache(UnitOfWork<?> uow, List<Facet> facets) { protected E checkCache(UnitOfWork uow, List<Facet> facets) {
E result = null; E result = null;
Optional<Object> optionalCachedResult = Optional.empty(); Optional<Object> optionalCachedResult = Optional.empty();
@ -331,7 +318,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
return result; return result;
} }
protected Object cacheUpdate(UnitOfWork<?> uow, E pojo, List<Facet> identifyingFacets) { protected Object cacheUpdate(UnitOfWork uow, E pojo, List<Facet> identifyingFacets) {
List<Facet> facets = new ArrayList<>(); List<Facet> facets = new ArrayList<>();
Map<String, Object> valueMap = Map<String, Object> valueMap =
pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null; pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;

View file

@ -99,7 +99,6 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
this.execute( this.execute(
sessionOps, sessionOps,
null, null,
traceContext,
queryExecutionTimeout, queryExecutionTimeout,
queryTimeoutUnits, queryTimeoutUnits,
showValues, showValues,
@ -203,14 +202,7 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
// Check to see if we fetched the object from the cache // Check to see if we fetched the object from the cache
if (resultStream == null) { if (resultStream == null) {
ResultSet resultSet = ResultSet resultSet =
execute( execute(sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, true);
sessionOps,
uow,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
true);
resultStream = transform(resultSet); resultStream = transform(resultSet);
} }

View file

@ -72,13 +72,7 @@ public class BatchOperation extends Operation<Long> {
batch.setDefaultTimestamp(timestampGenerator.next()); batch.setDefaultTimestamp(timestampGenerator.next());
ResultSet resultSet = ResultSet resultSet =
this.execute( this.execute(
sessionOps, sessionOps, null, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
null,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
false);
if (!resultSet.wasApplied()) { if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch."); throw new HelenusException("Failed to apply batch.");
} }
@ -88,7 +82,7 @@ public class BatchOperation extends Operation<Long> {
return batch.getDefaultTimestamp(); return batch.getDefaultTimestamp();
} }
public Long sync(UnitOfWork<?> uow) throws TimeoutException { public Long sync(UnitOfWork uow) throws TimeoutException {
if (operations.size() == 0) return 0L; if (operations.size() == 0) return 0L;
if (uow == null) return sync(); if (uow == null) return sync();
@ -99,13 +93,7 @@ public class BatchOperation extends Operation<Long> {
batch.setDefaultTimestamp(timestampGenerator.next()); batch.setDefaultTimestamp(timestampGenerator.next());
ResultSet resultSet = ResultSet resultSet =
this.execute( this.execute(
sessionOps, sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
uow,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
false);
if (!resultSet.wasApplied()) { if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch."); throw new HelenusException("Failed to apply batch.");
} }

View file

@ -15,9 +15,6 @@
*/ */
package net.helenus.core.operation; package net.helenus.core.operation;
import brave.Span;
import brave.Tracer;
import brave.propagation.TraceContext;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
@ -43,7 +40,6 @@ public abstract class Operation<E> {
protected final AbstractSessionOperations sessionOps; protected final AbstractSessionOperations sessionOps;
protected boolean showValues; protected boolean showValues;
protected TraceContext traceContext;
protected long queryExecutionTimeout = 10; protected long queryExecutionTimeout = 10;
protected TimeUnit queryTimeoutUnits = TimeUnit.SECONDS; protected TimeUnit queryTimeoutUnits = TimeUnit.SECONDS;
protected final Meter uowCacheHits; protected final Meter uowCacheHits;
@ -96,27 +92,12 @@ public abstract class Operation<E> {
public ResultSet execute( public ResultSet execute(
AbstractSessionOperations session, AbstractSessionOperations session,
UnitOfWork uow, UnitOfWork uow,
TraceContext traceContext,
long timeout, long timeout,
TimeUnit units, TimeUnit units,
boolean showValues, boolean showValues,
boolean cached) boolean cached)
throws TimeoutException { throws TimeoutException {
// Start recording in a Zipkin sub-span our execution time to perform this operation.
Tracer tracer = session.getZipkinTracer();
Span span = null;
if (tracer != null && traceContext != null) {
span = tracer.newChild(traceContext);
}
try {
if (span != null) {
span.name("cassandra");
span.start();
}
Statement statement = options(buildStatement(cached)); Statement statement = options(buildStatement(cached));
if (session.isShowCql()) { if (session.isShowCql()) {
@ -185,13 +166,6 @@ public abstract class Operation<E> {
if (uow != null) uow.addDatabaseTime("Cassandra", timer); if (uow != null) uow.addDatabaseTime("Cassandra", timer);
log(statement, uow, timer, showValues); log(statement, uow, timer, showValues);
} }
} finally {
if (span != null) {
span.finish();
}
}
} }
void log(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) { void log(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {

View file

@ -17,13 +17,11 @@ package net.helenus.test.integration.core.simple;
import static net.helenus.core.Query.eq; import static net.helenus.core.Query.eq;
import com.datastax.driver.core.ResultSet;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import net.helenus.core.Helenus; import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession; import net.helenus.core.HelenusSession;
import net.helenus.core.Operator; import net.helenus.core.Operator;
import net.helenus.core.operation.UpdateOperation;
import net.helenus.support.Fun; import net.helenus.support.Fun;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest; import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
import org.junit.Assert; import org.junit.Assert;
@ -184,7 +182,6 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest {
.set(user::age, null) .set(user::age, null)
.set(user::type, null) .set(user::type, null)
.where(user::id, eq(100L)) .where(user::id, eq(100L))
.zipkinContext(null)
.sync(); .sync();
Fun.Tuple3<String, Integer, UserType> tuple = Fun.Tuple3<String, Integer, UserType> tuple =
@ -217,20 +214,6 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest {
} }
} }
public void testZipkin() throws TimeoutException {
session
.update()
.set(user::name, null)
.set(user::age, null)
.set(user::type, null)
.where(user::id, eq(100L))
.zipkinContext(null)
.sync();
UpdateOperation<ResultSet> update = session.update();
update.set(user::name, null).zipkinContext(null).sync();
}
private void assertUsers(User expected, User actual) { private void assertUsers(User expected, User actual) {
Assert.assertEquals(expected.id(), actual.id()); Assert.assertEquals(expected.id(), actual.id());
Assert.assertEquals(expected.name(), actual.name()); Assert.assertEquals(expected.name(), actual.name());

View file

@ -91,7 +91,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest {
() -> { () -> {
q.add("1"); q.add("1");
}) })
.exceptionally( .orElse(
() -> { () -> {
q.add("a"); q.add("a");
}); });
@ -101,7 +101,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest {
() -> { () -> {
q.add("2"); q.add("2");
}) })
.exceptionally( .orElse(
() -> { () -> {
q.add("b"); q.add("b");
}); });
@ -110,7 +110,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest {
() -> { () -> {
q.add("3"); q.add("3");
}) })
.exceptionally( .orElse(
() -> { () -> {
q.add("c"); q.add("c");
}); });
@ -119,7 +119,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest {
() -> { () -> {
q.add("4"); q.add("4");
}) })
.exceptionally( .orElse(
() -> { () -> {
q.add("d"); q.add("d");
}); });
@ -132,7 +132,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest {
() -> { () -> {
q.add("5"); q.add("5");
}) })
.exceptionally( .orElse(
() -> { () -> {
q.add("e"); q.add("e");
}); });