Add generic type where overlooked. Fix some cache logic.

This commit is contained in:
Greg Burd 2017-10-25 21:59:12 -04:00
parent e5918cd1e8
commit c7e37acc5a
10 changed files with 60 additions and 53 deletions

View file

@ -86,11 +86,11 @@ public abstract class AbstractSessionOperations {
return execute(statement, null, timer, showValues); return execute(statement, null, timer, showValues);
} }
public ResultSet execute(Statement statement, UnitOfWork uow, boolean showValues) { public ResultSet execute(Statement statement, UnitOfWork<?> uow, boolean showValues) {
return execute(statement, uow, null, showValues); return execute(statement, uow, null, showValues);
} }
public ResultSet execute(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) { public ResultSet execute(Statement statement, UnitOfWork<?> uow, Stopwatch timer, boolean showValues) {
return executeAsync(statement, uow, timer, showValues).getUninterruptibly(); return executeAsync(statement, uow, timer, showValues).getUninterruptibly();
} }
@ -102,11 +102,11 @@ public abstract class AbstractSessionOperations {
return executeAsync(statement, null, timer, showValues); return executeAsync(statement, null, timer, showValues);
} }
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, boolean showValues) { public ResultSetFuture executeAsync(Statement statement, UnitOfWork<?> uow, boolean showValues) {
return executeAsync(statement, uow, null, showValues); return executeAsync(statement, uow, null, showValues);
} }
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) { public ResultSetFuture executeAsync(Statement statement, UnitOfWork<?> uow, Stopwatch timer, boolean showValues) {
try { try {
logStatement(statement, showValues); logStatement(statement, showValues);
return currentSession().executeAsync(statement); return currentSession().executeAsync(statement);

View file

@ -2,9 +2,9 @@ package net.helenus.core;
public class ConflictingUnitOfWorkException extends Exception { public class ConflictingUnitOfWorkException extends Exception {
final UnitOfWork uow; final UnitOfWork<?> uow;
ConflictingUnitOfWorkException(UnitOfWork uow) { ConflictingUnitOfWorkException(UnitOfWork<?> uow) {
this.uow = uow; this.uow = uow;
} }
} }

View file

@ -5,10 +5,10 @@ import java.util.Objects;
public class PostCommitFunction<T, R> implements java.util.function.Function<T, R> { public class PostCommitFunction<T, R> implements java.util.function.Function<T, R> {
private final UnitOfWork uow; private final UnitOfWork<?> uow;
private final List<CommitThunk> postCommit; private final List<CommitThunk> postCommit;
PostCommitFunction(UnitOfWork uow, List<CommitThunk> postCommit) { PostCommitFunction(UnitOfWork<?> uow, List<CommitThunk> postCommit) {
this.uow = uow; this.uow = uow;
this.postCommit = postCommit; this.postCommit = postCommit;
} }

View file

@ -46,7 +46,7 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
} }
} }
public E sync(UnitOfWork uow) {// throws TimeoutException { public E sync(UnitOfWork<?> uow) {// throws TimeoutException {
if (uow == null) if (uow == null)
return sync(); return sync();
@ -71,7 +71,7 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
}); });
} }
public CompletableFuture<E> async(UnitOfWork uow) { public CompletableFuture<E> async(UnitOfWork<?> uow) {
if (uow == null) if (uow == null)
return async(); return async();
return CompletableFuture.<E>supplyAsync(() -> { return CompletableFuture.<E>supplyAsync(() -> {

View file

@ -154,25 +154,30 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
updateCache = false; updateCache = false;
} }
if (!result.isPresent()) { // Check to see if we fetched the object from the cache
// Formulate the query and execute it against the Cassandra cluster. if (result.isPresent()) {
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, // If we fetched the `deleted` object then the result is null (really
showValues, true); // Optional.empty()).
if (result.get() == deleted) {
// Transform the query result set into the desired shape. result = Optional.empty();
result = transform(resultSet);
}
if (result.get() == deleted) {
return Optional.empty();
} else {
// If we have a result, it wasn't from the UOW cache, and we're caching things
// then we need to put this result into the cache for future requests to find.
if (updateCache && result.isPresent()) {
cacheUpdate(uow, result.get(), getFacets());
} }
return result; } else {
}
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true);
// Transform the query result set into the desired shape.
result = transform(resultSet);
}
// If we have a result, it wasn't from the UOW cache, and we're caching things
// then we need to put this result into the cache for future requests to find.
if (updateCache && result.isPresent() && result.get() != deleted) {
cacheUpdate(uow, result.get(), getFacets());
}
return result;
} finally { } finally {
context.stop(); context.stop();
} }

View file

@ -160,27 +160,29 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
updateCache = false; updateCache = false;
} }
// Check to see if we fetched the object from the cache
if (resultStream == null) { if (resultStream == null) {
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true); showValues, true);
resultStream = transform(resultSet); resultStream = transform(resultSet);
} }
// If we have a result and we're caching then we need to put it into the cache
// for future requests to find.
if (resultStream != null) {
List<E> again = new ArrayList<>();
List<Facet> facets = getFacets();
resultStream.forEach(result -> {
if (result != deleted) {
if (updateCache) {
cacheUpdate(uow, result, facets);
}
again.add(result);
}
});
resultStream = again.stream();
}
// If we have a result and we're caching then we need to put it into the cache
// for future requests to find.
if (resultStream != null) {
List<E> again = new ArrayList<>();
List<Facet> facets = getFacets();
resultStream.forEach(result -> {
if (result != deleted) {
if (updateCache) {
cacheUpdate(uow, result, facets);
}
again.add(result);
}
});
resultStream = again.stream();
}
return resultStream; return resultStream;
} finally { } finally {
context.stop(); context.stop();

View file

@ -182,7 +182,7 @@ public final class DeleteOperation extends AbstractFilterOperation<ResultSet, De
} }
@Override @Override
public ResultSet sync(UnitOfWork uow) {// throws TimeoutException { public ResultSet sync(UnitOfWork<?> uow) {// throws TimeoutException {
if (uow == null) { if (uow == null) {
return sync(); return sync();
} }

View file

@ -245,7 +245,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
} }
@Override @Override
public T sync(UnitOfWork uow) {// throws TimeoutException { public T sync(UnitOfWork<?> uow) {// throws TimeoutException {
if (uow == null) { if (uow == null) {
return sync(); return sync();
} }

View file

@ -87,8 +87,8 @@ public abstract class Operation<E> {
return query; return query;
} }
public ResultSet execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, long timeout, public ResultSet execute(AbstractSessionOperations session, UnitOfWork<?> uow, TraceContext traceContext,
TimeUnit units, boolean showValues, boolean cached) { // throws TimeoutException { long timeout, TimeUnit units, boolean showValues, boolean cached) { // throws TimeoutException {
// Start recording in a Zipkin sub-span our execution time to perform this // Start recording in a Zipkin sub-span our execution time to perform this
// operation. // operation.
@ -129,7 +129,7 @@ public abstract class Operation<E> {
} }
} }
void log(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) { void log(Statement statement, UnitOfWork<?> uow, Stopwatch timer, boolean showValues) {
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
String uowString = ""; String uowString = "";
if (uow != null) { if (uow != null) {

View file

@ -577,7 +577,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
} }
@Override @Override
public E sync(UnitOfWork uow) {// throws TimeoutException { public E sync(UnitOfWork<?> uow) {// throws TimeoutException {
if (uow == null) { if (uow == null) {
return sync(); return sync();
} }