diff --git a/pom.xml b/pom.xml index e4586cb..ae29573 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 net.helenus helenus-core - 2.0.23-SNAPSHOT + 2.0.24-SNAPSHOT jar helenus diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index 0d8f315..85a3cd1 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -22,10 +22,8 @@ import com.datastax.driver.core.querybuilder.BuiltStatement; import com.google.common.util.concurrent.ListenableFuture; import java.io.PrintStream; import java.util.concurrent.Executor; - import net.helenus.core.operation.AbstractCache; import net.helenus.core.operation.CacheManager; -import net.helenus.mapping.HelenusEntity; import net.helenus.mapping.value.ColumnValuePreparer; import net.helenus.mapping.value.ColumnValueProvider; import net.helenus.support.HelenusException; @@ -115,7 +113,9 @@ public abstract class AbstractSessionOperations { return null; } - public AbstractCache cacheFor(CacheManager.Type type) { return null; } + public AbstractCache cacheFor(CacheManager.Type type) { + return null; + } RuntimeException translateException(RuntimeException e) { if (e instanceof HelenusException) { diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index 7557611..9c4df6b 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -42,6 +42,14 @@ import net.helenus.support.Fun.Tuple1; import net.helenus.support.Fun.Tuple2; import net.helenus.support.Fun.Tuple6; +import java.io.Closeable; +import java.io.PrintStream; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.function.Function; + import static net.helenus.core.Query.eq; public final class HelenusSession extends AbstractSessionOperations implements Closeable { @@ -65,7 +73,6 @@ public final class HelenusSession extends AbstractSessionOperations implements C private final StatementColumnValuePreparer valuePreparer; private final Metadata metadata; private final CacheManager cacheManager; - private UnitOfWork currentUnitOfWork; HelenusSession( Session session, @@ -96,7 +103,6 @@ public final class HelenusSession extends AbstractSessionOperations implements C this.valueProvider = new RowColumnValueProvider(this.sessionRepository); this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository); this.metadata = session.getCluster().getMetadata(); - this.currentUnitOfWork = null; this.cacheManager = new CacheManager(this); } @@ -205,7 +211,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C ColumnValueProvider valueProvider = getValueProvider(); HelenusEntity entity = Helenus.entity(entityClass); - //TODO cache entity + //TODO cache entity return new SelectOperation( this, entity, @@ -498,7 +504,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C @Override public AbstractCache cacheFor(CacheManager.Type type) { - return cacheManager.of(type); + return cacheManager.of(type); } public Session getSession() { diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 17558e7..15d8944 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -1,18 +1,35 @@ package net.helenus.core; import com.diffplug.common.base.Errors; -import java.util.ArrayList; +import com.google.common.collect.TreeTraverser; + +import java.util.*; import java.util.function.Function; /** Encapsulates the concept of a "transaction" as a unit-of-work. */ public class UnitOfWork { - private final HelenusSession session; - private ArrayList nested; + static private final Map all = new HashMap(); + static private final List nested = new ArrayList<>(); - UnitOfWork(HelenusSession session) { - this.session = session; - // log.record(txn::start) + private final HelenusSession session; + private ArrayList postCommit = new ArrayList(); + private boolean aborted = false; + private boolean committed = false; + + /** + * Marks the beginning of a transactional section of work. Will write a record to the shared + * write-ahead log. + * + * @return the handle used to commit or abort the work. + */ + static UnitOfWork begin(HelenusSession session) { + Objects.requireNonNull(session, "containing session cannot be null"); + UnitOfWork uow = new UnitOfWork(session); + synchronized (all) { + all.put(session, uow); + } + return uow; } /** @@ -21,13 +38,37 @@ public class UnitOfWork { * * @return the handle used to commit or abort the work. */ - public UnitOfWork begin() { - if (nested == null) { - nested = new ArrayList(); + static UnitOfWork begin(UnitOfWork parent) { + Objects.requireNonNull(parent, "parent unit of work cannot be null"); + Objects.requireNonNull(all.get(parent), "parent unit of work is not currently active"); + + UnitOfWork uow = new UnitOfWork(parent.session); + synchronized (all) { + all.put(parent.session, uow); + parent.addNestedUnitOfWork(uow); } - UnitOfWork unitOfWork = new UnitOfWork(session); - nested.add(unitOfWork); - return unitOfWork; + return uow; + } + + private UnitOfWork(HelenusSession session) { + this.session = session; + // log.record(txn::start) + } + + private void addNestedUnitOfWork(UnitOfWork uow) { + synchronized (nested) { + nested.add(uow); + } + } + + private void applyPostCommitFunctions() { + for(Function f : postCommit) { + f.apply(null); + } + } + + private Iterator getChildNodes() { + return nested.iterator(); } /** @@ -36,24 +77,58 @@ public class UnitOfWork { * @return a function from which to chain work that only happens when commit is successful * @throws ConflictingUnitOfWorkException when the work overlaps with other concurrent writers. */ - public Function commit() throws ConflictingUnitOfWorkException { - if (nested != null) { - nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit)); - } + public PostCommitFunction commit() throws ConflictingUnitOfWorkException { + // All nested UnitOfWork should be committed (not aborted) before calls to commit, check. + boolean canCommit = true; + TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes); + for (UnitOfWork uow : traverser.postOrderTraversal(this)) { canCommit &= (!uow.aborted && uow.committed); } + + traverser.postOrderTraversal(this).forEach(uow -> { uow.applyPostCommitFunctions(); }); + + nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit)); // log.record(txn::provisionalCommit) // examine log for conflicts in read-set and write-set between begin and provisional commit // if (conflict) { throw new ConflictingUnitOfWorkException(this) } // else return function so as to enable commit.andThen(() -> { do something iff commit was successful; }) - return Function.identity(); + + return new PostCommitFunction(this); + } + + + public void rollback() { + abort(); } /** Explicitly discard the work and mark it as as such in the log. */ public void abort() { // log.record(txn::abort) // cache.invalidateSince(txn::start time) + TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes); + traverser.postOrderTraversal(this).forEach(uow -> { uow.aborted = true; }); } public String describeConflicts() { return "it's complex..."; } + + private class PostCommitFunction implements java.util.function.Function { + + private final UnitOfWork uow; + + PostCommitFunction(UnitOfWork uow) { + this.uow = uow; + } + + @Override + public PostCommitFunction andThen(Function after) { + Objects.requireNonNull(after); + postCommit.add(after); + return null; + } + + @Override + public R apply(T t) { + return null; + } + } } diff --git a/src/main/java/net/helenus/core/operation/AbstractCache.java b/src/main/java/net/helenus/core/operation/AbstractCache.java index 17a4cef..07375c7 100644 --- a/src/main/java/net/helenus/core/operation/AbstractCache.java +++ b/src/main/java/net/helenus/core/operation/AbstractCache.java @@ -1,38 +1,40 @@ package net.helenus.core.operation; -import java.util.concurrent.ExecutionException; - import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Statement; import com.google.common.cache.Cache; +import java.util.concurrent.ExecutionException; public abstract class AbstractCache { - protected CacheManager.Type type; - protected Cache cache; + protected CacheManager.Type type; + protected Cache cache; - public AbstractCache(CacheManager.Type type, Cache cache) { - this.type = type; - this.cache = cache; + public AbstractCache(CacheManager.Type type, Cache cache) { + this.type = type; + this.cache = cache; + } + + protected abstract ResultSet fetch( + Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) + throws InterruptedException, ExecutionException; + + protected abstract ResultSet mutate( + Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) + throws InterruptedException, ExecutionException; + + public ResultSet apply( + Statement statement, OperationsDelegate delegate, ResultSetFuture futureResultSet) + throws InterruptedException, ExecutionException { + ResultSet resultSet = null; + switch (type) { + case FETCH: + resultSet = fetch(statement, delegate, futureResultSet); + break; + case MUTATE: + resultSet = mutate(statement, delegate, futureResultSet); + break; } - - protected abstract ResultSet fetch(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) - throws InterruptedException, ExecutionException; - protected abstract ResultSet mutate(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) - throws InterruptedException, ExecutionException; - - public ResultSet apply(Statement statement, OperationsDelegate delegate, ResultSetFuture futureResultSet) - throws InterruptedException, ExecutionException { - ResultSet resultSet = null; - switch (type) { - case FETCH: - resultSet = fetch(statement, delegate, futureResultSet); - break; - case MUTATE: - resultSet = mutate(statement, delegate, futureResultSet); - break; - } - return resultSet; - } - + return resultSet; + } } diff --git a/src/main/java/net/helenus/core/operation/AbstractOperation.java b/src/main/java/net/helenus/core/operation/AbstractOperation.java index 42e1826..9b091cb 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOperation.java @@ -18,19 +18,24 @@ package net.helenus.core.operation; import com.datastax.driver.core.ResultSet; import java.util.concurrent.CompletableFuture; import net.helenus.core.AbstractSessionOperations; +import net.helenus.core.UnitOfWork; public abstract class AbstractOperation> extends AbstractStatementOperation implements OperationsDelegate { public abstract E transform(ResultSet resultSet); - protected AbstractCache getCache() { return null; } + protected AbstractCache getCache() { + return null; + } public boolean cacheable() { return false; } - public CacheKey getCacheKey() { return null; } + public CacheKey getCacheKey() { + return null; + } public AbstractOperation(AbstractSessionOperations sessionOperations) { super(sessionOperations); @@ -42,11 +47,20 @@ public abstract class AbstractOperation> public E sync() { return Executioner.INSTANCE.sync( - sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues); + sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues); + } + public E sync(UnitOfWork uow) { + return Executioner.INSTANCE.sync( + sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues); } public CompletableFuture async() { return Executioner.INSTANCE.async( - sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues); + sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues); + } + + public CompletableFuture async(UnitOfWork uow) { + return Executioner.INSTANCE.async( + sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index 5246c65..80c4268 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture; import java.util.Optional; import java.util.concurrent.CompletableFuture; import net.helenus.core.AbstractSessionOperations; +import net.helenus.core.UnitOfWork; public abstract class AbstractOptionalOperation> extends AbstractStatementOperation implements OperationsDelegate> { @@ -33,9 +34,13 @@ public abstract class AbstractOptionalOperation transform(ResultSet resultSet); - protected AbstractCache getCache() { return null; } + protected AbstractCache getCache() { + return null; + } - public CacheKey getCacheKey() { return null; } + public CacheKey getCacheKey() { + return null; + } public PreparedOptionalOperation prepare() { return new PreparedOptionalOperation(prepareStatement(), this); @@ -54,13 +59,23 @@ public abstract class AbstractOptionalOperation sync() { - return Executioner.INSTANCE.>sync( - sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues); + sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues); + } + + public Optional sync(UnitOfWork uow) { + return Executioner.INSTANCE.>sync( + sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues); } public CompletableFuture> async() { return Executioner.INSTANCE.>async( - sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues); + sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues); } + + public CompletableFuture> async(UnitOfWork uow) { + return Executioner.INSTANCE.>async( + sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues); + } + } diff --git a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java index 911d3bb..8a3e81a 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import net.helenus.core.AbstractSessionOperations; +import net.helenus.core.UnitOfWork; public abstract class AbstractStreamOperation> extends AbstractStatementOperation implements OperationsDelegate> { @@ -33,9 +34,13 @@ public abstract class AbstractStreamOperation transform(ResultSet resultSet); - protected AbstractCache getCache() { return null; } + protected AbstractCache getCache() { + return null; + } - public CacheKey getCacheKey() { return null; } + public CacheKey getCacheKey() { + return null; + } public PreparedStreamOperation prepare() { return new PreparedStreamOperation(prepareStatement(), this); @@ -55,11 +60,21 @@ public abstract class AbstractStreamOperation sync() { return Executioner.INSTANCE.>sync( - sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues); + sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues); + } + + public Stream sync(UnitOfWork uow) { + return Executioner.INSTANCE.>sync( + sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues); } public CompletableFuture> async() { return Executioner.INSTANCE.>async( - sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues); + sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues); + } + + public CompletableFuture> async(UnitOfWork uow) { + return Executioner.INSTANCE.>async( + sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues); } } diff --git a/src/main/java/net/helenus/core/operation/BoundOptionalOperation.java b/src/main/java/net/helenus/core/operation/BoundOptionalOperation.java index 071dfab..e64d4fb 100644 --- a/src/main/java/net/helenus/core/operation/BoundOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/BoundOptionalOperation.java @@ -39,10 +39,14 @@ public final class BoundOptionalOperation } @Override - protected AbstractCache getCache() { return delegate.getCache(); } + protected AbstractCache getCache() { + return delegate.getCache(); + } @Override - public CacheKey getCacheKey() { return delegate.getCacheKey(); } + public CacheKey getCacheKey() { + return delegate.getCacheKey(); + } @Override public Statement buildStatement() { diff --git a/src/main/java/net/helenus/core/operation/BoundStreamOperation.java b/src/main/java/net/helenus/core/operation/BoundStreamOperation.java index 4616dbe..2cbf534 100644 --- a/src/main/java/net/helenus/core/operation/BoundStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/BoundStreamOperation.java @@ -36,7 +36,9 @@ public final class BoundStreamOperation } @Override - protected AbstractCache getCache() { return delegate.getCache(); } + protected AbstractCache getCache() { + return delegate.getCache(); + } @Override public Stream transform(ResultSet resultSet) { @@ -44,7 +46,9 @@ public final class BoundStreamOperation } @Override - public CacheKey getCacheKey() { return cacheKey; } + public CacheKey getCacheKey() { + return cacheKey; + } @Override public Statement buildStatement() { diff --git a/src/main/java/net/helenus/core/operation/CacheKey.java b/src/main/java/net/helenus/core/operation/CacheKey.java index 2f08402..99460be 100644 --- a/src/main/java/net/helenus/core/operation/CacheKey.java +++ b/src/main/java/net/helenus/core/operation/CacheKey.java @@ -4,33 +4,38 @@ import com.datastax.driver.core.Statement; public class CacheKey { - private String key; + private String key; - static String of(Statement statement) { - return "use " + statement.getKeyspace() + "; " + statement.toString(); - } + static String of(Statement statement) { + return "use " + statement.getKeyspace() + "; " + statement.toString(); + } - CacheKey() {} + CacheKey() {} - CacheKey(String key) { this.key = key; } + CacheKey(String key) { + this.key = key; + } - public void set(String key) { this.key = key; } + public void set(String key) { + this.key = key; + } - public String toString() { return key; } + public String toString() { + return key; + } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; - CacheKey cacheKey = (CacheKey) o; + CacheKey cacheKey = (CacheKey) o; - return key.equals(cacheKey.key); - } - - @Override - public int hashCode() { - return key.hashCode(); - } + return key.equals(cacheKey.key); + } + @Override + public int hashCode() { + return key.hashCode(); + } } diff --git a/src/main/java/net/helenus/core/operation/CacheManager.java b/src/main/java/net/helenus/core/operation/CacheManager.java index b9e26b4..afddfc3 100644 --- a/src/main/java/net/helenus/core/operation/CacheManager.java +++ b/src/main/java/net/helenus/core/operation/CacheManager.java @@ -1,54 +1,53 @@ package net.helenus.core.operation; import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Statement; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; +import java.util.concurrent.TimeUnit; import net.helenus.core.HelenusSession; -import net.helenus.mapping.HelenusEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - public class CacheManager { - public enum Type { FETCH, MUTATE } + public enum Type { + FETCH, + MUTATE + } - final Logger logger = LoggerFactory.getLogger(getClass()); - final HelenusSession session; + final Logger logger = LoggerFactory.getLogger(getClass()); + final HelenusSession session; - private AbstractCache sessionFetch; + private AbstractCache sessionFetch; - public CacheManager(HelenusSession session) { - this.session = session; + public CacheManager(HelenusSession session) { + this.session = session; - RemovalListener listener = new RemovalListener() { - @Override - public void onRemoval(RemovalNotification n){ - if (n.wasEvicted()) { - String cause = n.getCause().name(); - logger.info(cause); - } + RemovalListener listener = + new RemovalListener() { + @Override + public void onRemoval(RemovalNotification n) { + if (n.wasEvicted()) { + String cause = n.getCause().name(); + logger.info(cause); } + } }; - Cache cache = CacheBuilder.newBuilder() - .maximumSize(10_000) - .expireAfterAccess(20, TimeUnit.MINUTES) - .weakKeys() - .softValues() - .removalListener(listener) - .build(); + Cache cache = + CacheBuilder.newBuilder() + .maximumSize(10_000) + .expireAfterAccess(20, TimeUnit.MINUTES) + .weakKeys() + .softValues() + .removalListener(listener) + .build(); - sessionFetch = new SessionCache(Type.FETCH, this, cache); - } - - public AbstractCache of(CacheManager.Type type) { - return sessionFetch; - } + sessionFetch = new SessionCache(Type.FETCH, this, cache); + } + public AbstractCache of(CacheManager.Type type) { + return sessionFetch; + } } diff --git a/src/main/java/net/helenus/core/operation/Executioner.java b/src/main/java/net/helenus/core/operation/Executioner.java index 361a0c5..0056a79 100644 --- a/src/main/java/net/helenus/core/operation/Executioner.java +++ b/src/main/java/net/helenus/core/operation/Executioner.java @@ -6,94 +6,81 @@ import brave.propagation.TraceContext; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Statement; -import net.helenus.core.AbstractSessionOperations; -import net.helenus.support.HelenusException; - import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import net.helenus.core.AbstractSessionOperations; +import net.helenus.core.UnitOfWork; public enum Executioner { INSTANCE; - E sync( - AbstractSessionOperations session, - Statement statement, - TraceContext traceContext, - OperationsDelegate delegate, - boolean showValues) { - return sync(session, statement, null, traceContext, delegate, showValues); + E sync( + AbstractSessionOperations session, + UnitOfWork uow, + Statement statement, + AbstractCache cache, + TraceContext traceContext, + OperationsDelegate delegate, + boolean showValues) { + ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); + return this.execute( + futureResultSet, session, uow, statement, cache, traceContext, delegate, showValues); + } + + public CompletableFuture async( + AbstractSessionOperations session, + UnitOfWork uow, + Statement statement, + AbstractCache cache, + TraceContext traceContext, + OperationsDelegate delegate, + boolean showValues) { + + ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); + return CompletableFuture.supplyAsync( + () -> + execute( + futureResultSet, session, uow, statement, cache, traceContext, delegate, showValues)); + } + + public E execute( + ResultSetFuture futureResultSet, + AbstractSessionOperations session, + UnitOfWork uow, + Statement statement, + AbstractCache cache, + TraceContext traceContext, + OperationsDelegate delegate, + boolean showValues) { + + Tracer tracer = session.getZipkinTracer(); + Span span = null; + if (tracer != null && traceContext != null) { + span = tracer.newChild(traceContext); } - E sync( - AbstractSessionOperations session, - Statement statement, - AbstractCache cache, - TraceContext traceContext, - OperationsDelegate delegate, - boolean showValues) { - ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); - return this.execute(futureResultSet, session, statement, cache, traceContext, delegate, showValues); - } - - public CompletableFuture async( - AbstractSessionOperations session, - Statement statement, - TraceContext traceContext, - OperationsDelegate delegate, - boolean showValues) { - return async(session, statement, null, traceContext, delegate, showValues); - } - - public CompletableFuture async( - AbstractSessionOperations session, - Statement statement, - AbstractCache cache, - TraceContext traceContext, - OperationsDelegate delegate, - boolean showValues) { - - ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); - return CompletableFuture.supplyAsync(() -> - execute(futureResultSet, session, statement, cache, traceContext, delegate, showValues)); - } - - public E execute(ResultSetFuture futureResultSet, - AbstractSessionOperations session, - Statement statement, - AbstractCache cache, - TraceContext traceContext, - OperationsDelegate delegate, - boolean showValues) { - - Tracer tracer = session.getZipkinTracer(); - Span span = null; - if (tracer != null && traceContext != null) { - span = tracer.newChild(traceContext); + try { + if (span != null) { + span.name("cassandra"); + span.start(); } - try { - if (span != null) { - span.name("cassandra"); - span.start(); - } - - ResultSet resultSet; - if (cache != null) { - resultSet = cache.apply(statement, delegate, futureResultSet); - } else { - resultSet = futureResultSet.get(); - } - - E result = delegate.transform(resultSet); - - return result; - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } finally { - if (span != null) { - span.finish(); - } + ResultSet resultSet; + if (cache != null) { + resultSet = cache.apply(statement, delegate, futureResultSet); + } else { + resultSet = futureResultSet.get(); } - } + E result = delegate.transform(resultSet); + + return result; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } finally { + if (span != null) { + span.finish(); + } + } + } } diff --git a/src/main/java/net/helenus/core/operation/OperationsDelegate.java b/src/main/java/net/helenus/core/operation/OperationsDelegate.java index 40ebfe3..eb3a7f2 100644 --- a/src/main/java/net/helenus/core/operation/OperationsDelegate.java +++ b/src/main/java/net/helenus/core/operation/OperationsDelegate.java @@ -4,5 +4,6 @@ import com.datastax.driver.core.ResultSet; public interface OperationsDelegate { E transform(ResultSet resultSet); + CacheKey getCacheKey(); } diff --git a/src/main/java/net/helenus/core/operation/PreparedStreamOperation.java b/src/main/java/net/helenus/core/operation/PreparedStreamOperation.java index 97cca27..d070c72 100644 --- a/src/main/java/net/helenus/core/operation/PreparedStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/PreparedStreamOperation.java @@ -17,7 +17,6 @@ package net.helenus.core.operation; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; - import java.util.regex.Pattern; public final class PreparedStreamOperation { @@ -39,8 +38,11 @@ public final class PreparedStreamOperation { BoundStatement boundStatement = preparedStatement.bind(params); - String key = "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString(); - for (Object param : params) { key = key.replaceFirst(Pattern.quote("?"), param.toString()); } + String key = + "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString(); + for (Object param : params) { + key = key.replaceFirst(Pattern.quote("?"), param.toString()); + } return new BoundStreamOperation(boundStatement, new CacheKey(key), operation); } diff --git a/src/main/java/net/helenus/core/operation/SelectFirstOperation.java b/src/main/java/net/helenus/core/operation/SelectFirstOperation.java index e1e3685..3716a34 100644 --- a/src/main/java/net/helenus/core/operation/SelectFirstOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectFirstOperation.java @@ -46,5 +46,4 @@ public final class SelectFirstOperation public Optional transform(ResultSet resultSet) { return src.transform(resultSet).findFirst(); } - } diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index ead9332..2a467ba 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -46,29 +46,27 @@ public final class SelectOperation extends AbstractFilterStreamOperation() { + new Function() { - @Override - public E apply(Row source) { + @Override + public E apply(Row source) { - ColumnValueProvider valueProvider = sessionOps.getValueProvider(); - Object[] arr = new Object[props.size()]; + ColumnValueProvider valueProvider = sessionOps.getValueProvider(); + Object[] arr = new Object[props.size()]; - int i = 0; - for (HelenusPropertyNode p : props) { - Object value = valueProvider.getColumnValue(source, -1, p.getProperty()); - arr[i++] = value; - } - - return (E) Fun.ArrayTuple.of(arr); - } - }; + int i = 0; + for (HelenusPropertyNode p : props) { + Object value = valueProvider.getColumnValue(source, -1, p.getProperty()); + arr[i++] = value; + } + return (E) Fun.ArrayTuple.of(arr); + } + }; } public SelectOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity) { @@ -76,38 +74,35 @@ public final class SelectOperation extends AbstractFilterStreamOperation new HelenusPropertyNode(p, Optional.empty())) - .forEach(p -> this.props.add(p)); - + .getOrderedProperties() + .stream() + .map(p -> new HelenusPropertyNode(p, Optional.empty())) + .forEach(p -> this.props.add(p)); } public SelectOperation( - AbstractSessionOperations sessionOperations, - HelenusEntity entity, - Function rowMapper) { + AbstractSessionOperations sessionOperations, + HelenusEntity entity, + Function rowMapper) { super(sessionOperations); this.rowMapper = rowMapper; entity - .getOrderedProperties() - .stream() - .map(p -> new HelenusPropertyNode(p, Optional.empty())) - .forEach(p -> this.props.add(p)); - + .getOrderedProperties() + .stream() + .map(p -> new HelenusPropertyNode(p, Optional.empty())) + .forEach(p -> this.props.add(p)); } public SelectOperation( - AbstractSessionOperations sessionOperations, - Function rowMapper, - HelenusPropertyNode... props) { + AbstractSessionOperations sessionOperations, + Function rowMapper, + HelenusPropertyNode... props) { super(sessionOperations); this.rowMapper = rowMapper; Collections.addAll(this.props, props); - } public CountOperation count() { @@ -119,10 +114,10 @@ public final class SelectOperation extends AbstractFilterStreamOperation extends AbstractFilterStreamOperation( - this, - (r) -> { - Map map = new ValueProviderMap(r, sessionOps.getValueProvider(), entity); - return (R) Helenus.map(entityClass, map); - }); + this, + (r) -> { + Map map = new ValueProviderMap(r, sessionOps.getValueProvider(), entity); + return (R) Helenus.map(entityClass, map); + }); } public SelectTransformingOperation map(Function fn) { @@ -197,10 +192,10 @@ public final class SelectOperation extends AbstractFilterStreamOperation extends AbstractFilterStreamOperation extends AbstractFilterStreamOperation) - StreamSupport.stream( - Spliterators.spliteratorUnknownSize(resultSet.iterator(), Spliterator.ORDERED), - false); + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(resultSet.iterator(), Spliterator.ORDERED), + false); } } diff --git a/src/main/java/net/helenus/core/operation/SelectTransformingOperation.java b/src/main/java/net/helenus/core/operation/SelectTransformingOperation.java index 00fff1e..4f19e7a 100644 --- a/src/main/java/net/helenus/core/operation/SelectTransformingOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectTransformingOperation.java @@ -41,11 +41,12 @@ public final class SelectTransformingOperation } @Override - protected AbstractCache getCache() { return src.getCache(); } + protected AbstractCache getCache() { + return src.getCache(); + } @Override public Stream transform(ResultSet resultSet) { return src.transform(resultSet).map(fn); } - } diff --git a/src/main/java/net/helenus/core/operation/SessionCache.java b/src/main/java/net/helenus/core/operation/SessionCache.java index 8cbfda5..7929663 100644 --- a/src/main/java/net/helenus/core/operation/SessionCache.java +++ b/src/main/java/net/helenus/core/operation/SessionCache.java @@ -1,65 +1,63 @@ package net.helenus.core.operation; -import java.util.concurrent.ExecutionException; - import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Statement; -import com.datastax.driver.core.querybuilder.Select; import com.google.common.cache.Cache; - +import java.util.concurrent.ExecutionException; public class SessionCache extends AbstractCache { - private final CacheManager manager; + private final CacheManager manager; - SessionCache(CacheManager.Type type, CacheManager manager, Cache cache) { - super(type, cache); - this.manager = manager; - } + SessionCache(CacheManager.Type type, CacheManager manager, Cache cache) { + super(type, cache); + this.manager = manager; + } - protected ResultSet fetch(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) - throws InterruptedException, ExecutionException { - final CacheKey key = delegate.getCacheKey(); - final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString(); - ResultSet resultSet = null; - if (cacheKey == null) { - resultSet = resultSetFuture.get(); - } else { - resultSet = cache.getIfPresent(cacheKey); - if (resultSet == null) { - resultSet = resultSetFuture.get(); - if (resultSet != null) { - planEvictionFor(statement); - cache.put(cacheKey, resultSet); - } - } + protected ResultSet fetch( + Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) + throws InterruptedException, ExecutionException { + final CacheKey key = delegate.getCacheKey(); + final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString(); + ResultSet resultSet = null; + if (cacheKey == null) { + resultSet = resultSetFuture.get(); + } else { + resultSet = cache.getIfPresent(cacheKey); + if (resultSet == null) { + resultSet = resultSetFuture.get(); + if (resultSet != null) { + planEvictionFor(statement); + cache.put(cacheKey, resultSet); } - return resultSet; + } } + return resultSet; + } - protected ResultSet mutate(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) - throws InterruptedException, ExecutionException { - CacheKey key = delegate.getCacheKey(); - final String cacheKey = key == null ? statement.toString() : key.toString(); - ResultSet resultSet = resultSetFuture.get(); - if (cacheKey != null && resultSet != null) { - planEvictionFor(statement); - //manager.evictIfNecessary(statement, delegate); - cache.put(cacheKey, resultSet); - } - return resultSet; + protected ResultSet mutate( + Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) + throws InterruptedException, ExecutionException { + CacheKey key = delegate.getCacheKey(); + final String cacheKey = key == null ? statement.toString() : key.toString(); + ResultSet resultSet = resultSetFuture.get(); + if (cacheKey != null && resultSet != null) { + planEvictionFor(statement); + //manager.evictIfNecessary(statement, delegate); + cache.put(cacheKey, resultSet); } + return resultSet; + } - private void planEvictionFor(Statement statement) { - //((Select)statement).table + statement.where.clauses.length == 0 - //TTL for rows read - } - - public ResultSet get(Statement statement, OperationsDelegate delegate) { - final CacheKey key = delegate.getCacheKey(); - final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString(); - return cache.getIfPresent(cacheKey); - } + private void planEvictionFor(Statement statement) { + //((Select)statement).table + statement.where.clauses.length == 0 + //TTL for rows read + } + public ResultSet get(Statement statement, OperationsDelegate delegate) { + final CacheKey key = delegate.getCacheKey(); + final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString(); + return cache.getIfPresent(cacheKey); + } } diff --git a/src/test/java/net/helenus/test/integration/core/prepared/Car.java b/src/test/java/net/helenus/test/integration/core/prepared/Car.java index 5320978..c5fa342 100644 --- a/src/test/java/net/helenus/test/integration/core/prepared/Car.java +++ b/src/test/java/net/helenus/test/integration/core/prepared/Car.java @@ -16,7 +16,6 @@ package net.helenus.test.integration.core.prepared; import java.math.BigDecimal; - import net.helenus.core.annotation.Cacheable; import net.helenus.mapping.annotation.PartitionKey; import net.helenus.mapping.annotation.Table; diff --git a/src/test/java/net/helenus/test/integration/core/simple/SimpleUserTest.java b/src/test/java/net/helenus/test/integration/core/simple/SimpleUserTest.java index 564958d..f471e81 100644 --- a/src/test/java/net/helenus/test/integration/core/simple/SimpleUserTest.java +++ b/src/test/java/net/helenus/test/integration/core/simple/SimpleUserTest.java @@ -92,13 +92,13 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest { // select row and map to entity User actual = - session - .selectAll(User.class) - .mapTo(User.class) - .where(user::id, eq(100L)) - .sync() - .findFirst() - .get(); + session + .selectAll(User.class) + .mapTo(User.class) + .where(user::id, eq(100L)) + .sync() + .findFirst() + .get(); assertUsers(newUser, actual); // select as object @@ -157,7 +157,7 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest { name = (String) session - .select() + .select() .column(user::name) .where(user::id, eq(100L)) .sync() @@ -198,10 +198,10 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest { }).sync(); session - .update(user::name, "albert") - .set(user::age, 35) - .where(user::id, Operator.EQ, 100L) - .sync(); + .update(user::name, "albert") + .set(user::age, 35) + .where(user::id, Operator.EQ, 100L) + .sync(); long cnt = session.count(user).where(user::id, Operator.EQ, 100L).sync(); Assert.assertEquals(1L, cnt);