Changes to andThen/orElse and adding exceptionally logic. On commit one or the other of andThen or orElse will fire based on the path (commit, abort). If during any of those an exception is thrown the exceptionally is called with the exception.

This commit is contained in:
Greg Burd 2018-04-02 11:45:49 -04:00
parent b449817659
commit 662a697d03
6 changed files with 95 additions and 49 deletions

View file

@ -1,6 +0,0 @@
package net.helenus.core;
@FunctionalInterface
public interface CommitThunk {
void apply();
}

View file

@ -2,48 +2,75 @@ package net.helenus.core;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import net.helenus.support.CheckedRunnable;
public class PostCommitFunction<T, R> implements java.util.function.Function<T, R> {
public static final PostCommitFunction<Void, Void> NULL_ABORT = new PostCommitFunction<>(null, null, false);
public static final PostCommitFunction<Void, Void> NULL_COMMIT = new PostCommitFunction<>(null, null, true);
public static final PostCommitFunction<Void, Void> NULL_ABORT = new PostCommitFunction<Void, Void>(null, null, null, false);
public static final PostCommitFunction<Void, Void> NULL_COMMIT = new PostCommitFunction<Void, Void>(null, null, null, true);
private final List<CommitThunk> commitThunks;
private final List<CommitThunk> abortThunks;
private final List<CheckedRunnable> commitThunks;
private final List<CheckedRunnable> abortThunks;
private Consumer<? super Throwable> exceptionallyThunk;
private boolean committed;
PostCommitFunction(
List<CommitThunk> postCommit,
List<CommitThunk> abortThunks,
PostCommitFunction(List<CheckedRunnable> postCommit, List<CheckedRunnable> abortThunks,
Consumer<? super Throwable> exceptionallyThunk,
boolean committed) {
this.commitThunks = postCommit;
this.abortThunks = abortThunks;
this.exceptionallyThunk = exceptionallyThunk;
this.committed = committed;
}
public PostCommitFunction<T, R> andThen(CommitThunk after) {
private void apply(CheckedRunnable... fns) {
try {
for (CheckedRunnable fn : fns) {
fn.run();
}
} catch (Throwable t) {
if (exceptionallyThunk != null) {
exceptionallyThunk.accept(t);
}
}
}
public PostCommitFunction<T, R> andThen(CheckedRunnable... after) {
Objects.requireNonNull(after);
if (commitThunks == null) {
if (committed) {
after.apply();
apply(after);
}
} else {
commitThunks.add(after);
for (CheckedRunnable fn : after) {
commitThunks.add(fn);
}
}
return this;
}
public PostCommitFunction<T, R> orElse(CommitThunk after) {
public PostCommitFunction<T, R> orElse(CheckedRunnable... after) {
Objects.requireNonNull(after);
if (abortThunks == null) {
if (!committed) {
after.apply();
apply(after);
}
} else {
abortThunks.add(after);
for (CheckedRunnable fn : after) {
abortThunks.add(fn);
}
}
return this;
}
public PostCommitFunction<T, R> exceptionally(Consumer<? super Throwable> fn) {
Objects.requireNonNull(fn);
exceptionallyThunk = fn;
return this;
}
@Override
public R apply(T t) {
return null;

View file

@ -36,6 +36,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheManager;
@ -54,6 +55,7 @@ import net.helenus.core.cache.MapCache;
import net.helenus.core.operation.AbstractOperation;
import net.helenus.core.operation.BatchOperation;
import net.helenus.mapping.MappingUtil;
import net.helenus.support.CheckedRunnable;
import net.helenus.support.Either;
import net.helenus.support.HelenusException;
import org.apache.commons.lang3.SerializationUtils;
@ -78,9 +80,10 @@ public class UnitOfWork implements AutoCloseable {
protected int databaseLookups = 0;
protected final Stopwatch elapsedTime;
protected Map<String, Double> databaseTime = new HashMap<>();
protected double cacheLookupTimeMSecs = 0.0d;
private List<CommitThunk> commitThunks = new ArrayList<CommitThunk>();
private List<CommitThunk> abortThunks = new ArrayList<CommitThunk>();
protected double cacheLookupTimeMSecs = 0.0;
private List<CheckedRunnable> commitThunks = new ArrayList<>();
private List<CheckedRunnable> abortThunks = new ArrayList<>();
private Consumer<? super Throwable> exceptionallyThunk;
private List<CompletableFuture<?>> asyncOperationFutures = new ArrayList<CompletableFuture<?>>();
private boolean aborted = false;
private boolean committed = false;
@ -243,10 +246,16 @@ public class UnitOfWork implements AutoCloseable {
return s;
}
private void applyPostCommitFunctions(String what, List<CommitThunk> thunks) {
private void applyPostCommitFunctions(String what, List<CheckedRunnable> thunks, Consumer<? super Throwable> exceptionallyThunk) {
if (!thunks.isEmpty()) {
for (CommitThunk f : thunks) {
f.apply();
for (CheckedRunnable f : thunks) {
try {
f.run();
} catch (Throwable t) {
if (exceptionallyThunk != null) {
exceptionallyThunk.accept(t);
}
}
}
}
}
@ -361,8 +370,7 @@ public class UnitOfWork implements AutoCloseable {
* @return a function from which to chain work that only happens when commit is successful
* @throws HelenusException when the work overlaps with other concurrent writers.
*/
public synchronized PostCommitFunction<Void, Void> commit()
throws HelenusException, TimeoutException {
public synchronized PostCommitFunction<Void, Void> commit() throws HelenusException {
if (isDone()) {
return PostCommitFunction.NULL_ABORT;
@ -392,7 +400,7 @@ public class UnitOfWork implements AutoCloseable {
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("aborted", abortThunks);
applyPostCommitFunctions("aborted", abortThunks, exceptionallyThunk);
});
elapsedTime.stop();
@ -413,7 +421,7 @@ public class UnitOfWork implements AutoCloseable {
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("committed", uow.commitThunks);
applyPostCommitFunctions("committed", uow.commitThunks, exceptionallyThunk);
});
// Merge our statement cache into the session cache if it exists.
@ -485,7 +493,7 @@ public class UnitOfWork implements AutoCloseable {
// Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass);
// T object = ctor.newInstance(new Object[] { String message });
// }
return new PostCommitFunction<Void, Void>(commitThunks, abortThunks, true);
return new PostCommitFunction<Void, Void>(commitThunks, abortThunks, exceptionallyThunk, true);
}
private void addBatched(BatchOperation batchArg) {
@ -518,7 +526,7 @@ public class UnitOfWork implements AutoCloseable {
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("aborted", uow.abortThunks);
applyPostCommitFunctions("aborted", uow.abortThunks, exceptionallyThunk);
uow.abortThunks.clear();
});

View file

@ -65,38 +65,42 @@ public class BatchOperation extends Operation<Long> {
return this;
}
public Long sync() throws TimeoutException {
public Long sync() {
if (operations.size() == 0) return 0L;
final Timer.Context context = requestLatency.time();
try {
batch.setDefaultTimestamp(timestampGenerator.next());
ResultSet resultSet =
this.execute(
sessionOps, null, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch.");
}
batch.setDefaultTimestamp(timestampGenerator.next());
ResultSet resultSet =
this.execute(
sessionOps, null, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch.");
}
} catch (TimeoutException e) {
throw new HelenusException(e);
} finally {
context.stop();
}
return batch.getDefaultTimestamp();
}
public Long sync(UnitOfWork uow) throws TimeoutException {
public Long sync(UnitOfWork uow) {
if (operations.size() == 0) return 0L;
if (uow == null) return sync();
final Timer.Context context = requestLatency.time();
final Stopwatch timer = Stopwatch.createStarted();
try {
uow.recordCacheAndDatabaseOperationCount(0, 1);
batch.setDefaultTimestamp(timestampGenerator.next());
ResultSet resultSet =
this.execute(
sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch.");
}
uow.recordCacheAndDatabaseOperationCount(0, 1);
batch.setDefaultTimestamp(timestampGenerator.next());
ResultSet resultSet =
this.execute(
sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch.");
}
} catch (TimeoutException e) {
throw new HelenusException(e);
} finally {
context.stop();
timer.stop();

View file

@ -0,0 +1,6 @@
package net.helenus.support;
@FunctionalInterface
public interface CheckedRunnable {
void run() throws Throwable;
}

View file

@ -76,6 +76,10 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest {
Arrays.equals(q.toArray(new String[5]), new String[] {"1", "2", "3", "4", "5"}));
}
private void throwAnException() throws Throwable {
throw new Exception("oops");
}
@Test
public void testExceptionWithinAndThen() throws Exception {
List<String> q = new ArrayList<String>(5);
@ -83,6 +87,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest {
uow5 = session.begin();
uow4 = session.begin(uow5);
Exception ex = new Exception();
try {
uow3 = session.begin(uow4);
uow1 = session.begin(uow3);
@ -122,7 +127,9 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest {
.orElse(
() -> {
q.add("d");
});
throwAnException();
})
.exceptionally(e -> Assert.assertEquals(e.getMessage(), "oops"));
throw new Exception();
} catch (Exception e) {
uow4.abort();