From 662a697d03928c5a7f94a798ef6612923148a1e0 Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Mon, 2 Apr 2018 11:45:49 -0400 Subject: [PATCH] Changes to andThen/orElse and adding exceptionally logic. On commit one or the other of andThen or orElse will fire based on the path (commit, abort). If during any of those an exception is thrown the exceptionally is called with the exception. --- .../java/net/helenus/core/CommitThunk.java | 6 --- .../net/helenus/core/PostCommitFunction.java | 53 ++++++++++++++----- .../java/net/helenus/core/UnitOfWork.java | 32 ++++++----- .../core/operation/BatchOperation.java | 38 +++++++------ .../net/helenus/support/CheckedRunnable.java | 6 +++ .../core/unitofwork/AndThenOrderTest.java | 9 +++- 6 files changed, 95 insertions(+), 49 deletions(-) delete mode 100644 src/main/java/net/helenus/core/CommitThunk.java create mode 100644 src/main/java/net/helenus/support/CheckedRunnable.java diff --git a/src/main/java/net/helenus/core/CommitThunk.java b/src/main/java/net/helenus/core/CommitThunk.java deleted file mode 100644 index 1ad9e05..0000000 --- a/src/main/java/net/helenus/core/CommitThunk.java +++ /dev/null @@ -1,6 +0,0 @@ -package net.helenus.core; - -@FunctionalInterface -public interface CommitThunk { - void apply(); -} diff --git a/src/main/java/net/helenus/core/PostCommitFunction.java b/src/main/java/net/helenus/core/PostCommitFunction.java index a7e152c..c2f7a8f 100644 --- a/src/main/java/net/helenus/core/PostCommitFunction.java +++ b/src/main/java/net/helenus/core/PostCommitFunction.java @@ -2,48 +2,75 @@ package net.helenus.core; import java.util.List; import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Function; + +import net.helenus.support.CheckedRunnable; public class PostCommitFunction implements java.util.function.Function { - public static final PostCommitFunction NULL_ABORT = new PostCommitFunction<>(null, null, false); - public static final PostCommitFunction NULL_COMMIT = new PostCommitFunction<>(null, null, true); + public static final PostCommitFunction NULL_ABORT = new PostCommitFunction(null, null, null, false); + public static final PostCommitFunction NULL_COMMIT = new PostCommitFunction(null, null, null, true); - private final List commitThunks; - private final List abortThunks; + private final List commitThunks; + private final List abortThunks; + private Consumer exceptionallyThunk; private boolean committed; - PostCommitFunction( - List postCommit, - List abortThunks, + PostCommitFunction(List postCommit, List abortThunks, + Consumer exceptionallyThunk, boolean committed) { this.commitThunks = postCommit; this.abortThunks = abortThunks; + this.exceptionallyThunk = exceptionallyThunk; this.committed = committed; } - public PostCommitFunction andThen(CommitThunk after) { + private void apply(CheckedRunnable... fns) { + try { + for (CheckedRunnable fn : fns) { + fn.run(); + } + } catch (Throwable t) { + if (exceptionallyThunk != null) { + exceptionallyThunk.accept(t); + } + } + } + + public PostCommitFunction andThen(CheckedRunnable... after) { Objects.requireNonNull(after); if (commitThunks == null) { if (committed) { - after.apply(); + apply(after); } } else { - commitThunks.add(after); + for (CheckedRunnable fn : after) { + commitThunks.add(fn); + } } return this; } - public PostCommitFunction orElse(CommitThunk after) { + public PostCommitFunction orElse(CheckedRunnable... after) { Objects.requireNonNull(after); if (abortThunks == null) { if (!committed) { - after.apply(); + apply(after); } } else { - abortThunks.add(after); + for (CheckedRunnable fn : after) { + abortThunks.add(fn); + } } return this; } + public PostCommitFunction exceptionally(Consumer fn) { + Objects.requireNonNull(fn); + exceptionallyThunk = fn; + return this; + } + @Override public R apply(T t) { return null; diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index dfcdf37..7581298 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.stream.Collectors; import javax.cache.Cache; import javax.cache.CacheManager; @@ -54,6 +55,7 @@ import net.helenus.core.cache.MapCache; import net.helenus.core.operation.AbstractOperation; import net.helenus.core.operation.BatchOperation; import net.helenus.mapping.MappingUtil; +import net.helenus.support.CheckedRunnable; import net.helenus.support.Either; import net.helenus.support.HelenusException; import org.apache.commons.lang3.SerializationUtils; @@ -78,9 +80,10 @@ public class UnitOfWork implements AutoCloseable { protected int databaseLookups = 0; protected final Stopwatch elapsedTime; protected Map databaseTime = new HashMap<>(); - protected double cacheLookupTimeMSecs = 0.0d; - private List commitThunks = new ArrayList(); - private List abortThunks = new ArrayList(); + protected double cacheLookupTimeMSecs = 0.0; + private List commitThunks = new ArrayList<>(); + private List abortThunks = new ArrayList<>(); + private Consumer exceptionallyThunk; private List> asyncOperationFutures = new ArrayList>(); private boolean aborted = false; private boolean committed = false; @@ -243,10 +246,16 @@ public class UnitOfWork implements AutoCloseable { return s; } - private void applyPostCommitFunctions(String what, List thunks) { + private void applyPostCommitFunctions(String what, List thunks, Consumer exceptionallyThunk) { if (!thunks.isEmpty()) { - for (CommitThunk f : thunks) { - f.apply(); + for (CheckedRunnable f : thunks) { + try { + f.run(); + } catch (Throwable t) { + if (exceptionallyThunk != null) { + exceptionallyThunk.accept(t); + } + } } } } @@ -361,8 +370,7 @@ public class UnitOfWork implements AutoCloseable { * @return a function from which to chain work that only happens when commit is successful * @throws HelenusException when the work overlaps with other concurrent writers. */ - public synchronized PostCommitFunction commit() - throws HelenusException, TimeoutException { + public synchronized PostCommitFunction commit() throws HelenusException { if (isDone()) { return PostCommitFunction.NULL_ABORT; @@ -392,7 +400,7 @@ public class UnitOfWork implements AutoCloseable { .postOrderTraversal(this) .forEach( uow -> { - applyPostCommitFunctions("aborted", abortThunks); + applyPostCommitFunctions("aborted", abortThunks, exceptionallyThunk); }); elapsedTime.stop(); @@ -413,7 +421,7 @@ public class UnitOfWork implements AutoCloseable { .postOrderTraversal(this) .forEach( uow -> { - applyPostCommitFunctions("committed", uow.commitThunks); + applyPostCommitFunctions("committed", uow.commitThunks, exceptionallyThunk); }); // Merge our statement cache into the session cache if it exists. @@ -485,7 +493,7 @@ public class UnitOfWork implements AutoCloseable { // Constructor ctor = clazz.getConstructor(conflictExceptionClass); // T object = ctor.newInstance(new Object[] { String message }); // } - return new PostCommitFunction(commitThunks, abortThunks, true); + return new PostCommitFunction(commitThunks, abortThunks, exceptionallyThunk, true); } private void addBatched(BatchOperation batchArg) { @@ -518,7 +526,7 @@ public class UnitOfWork implements AutoCloseable { .postOrderTraversal(this) .forEach( uow -> { - applyPostCommitFunctions("aborted", uow.abortThunks); + applyPostCommitFunctions("aborted", uow.abortThunks, exceptionallyThunk); uow.abortThunks.clear(); }); diff --git a/src/main/java/net/helenus/core/operation/BatchOperation.java b/src/main/java/net/helenus/core/operation/BatchOperation.java index 8aca9cf..b996de3 100644 --- a/src/main/java/net/helenus/core/operation/BatchOperation.java +++ b/src/main/java/net/helenus/core/operation/BatchOperation.java @@ -65,38 +65,42 @@ public class BatchOperation extends Operation { return this; } - public Long sync() throws TimeoutException { + public Long sync() { if (operations.size() == 0) return 0L; final Timer.Context context = requestLatency.time(); try { - batch.setDefaultTimestamp(timestampGenerator.next()); - ResultSet resultSet = - this.execute( - sessionOps, null, queryExecutionTimeout, queryTimeoutUnits, showValues, false); - if (!resultSet.wasApplied()) { - throw new HelenusException("Failed to apply batch."); - } + batch.setDefaultTimestamp(timestampGenerator.next()); + ResultSet resultSet = + this.execute( + sessionOps, null, queryExecutionTimeout, queryTimeoutUnits, showValues, false); + if (!resultSet.wasApplied()) { + throw new HelenusException("Failed to apply batch."); + } + } catch (TimeoutException e) { + throw new HelenusException(e); } finally { context.stop(); } return batch.getDefaultTimestamp(); } - public Long sync(UnitOfWork uow) throws TimeoutException { + public Long sync(UnitOfWork uow) { if (operations.size() == 0) return 0L; if (uow == null) return sync(); final Timer.Context context = requestLatency.time(); final Stopwatch timer = Stopwatch.createStarted(); try { - uow.recordCacheAndDatabaseOperationCount(0, 1); - batch.setDefaultTimestamp(timestampGenerator.next()); - ResultSet resultSet = - this.execute( - sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, false); - if (!resultSet.wasApplied()) { - throw new HelenusException("Failed to apply batch."); - } + uow.recordCacheAndDatabaseOperationCount(0, 1); + batch.setDefaultTimestamp(timestampGenerator.next()); + ResultSet resultSet = + this.execute( + sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, false); + if (!resultSet.wasApplied()) { + throw new HelenusException("Failed to apply batch."); + } + } catch (TimeoutException e) { + throw new HelenusException(e); } finally { context.stop(); timer.stop(); diff --git a/src/main/java/net/helenus/support/CheckedRunnable.java b/src/main/java/net/helenus/support/CheckedRunnable.java new file mode 100644 index 0000000..4d70627 --- /dev/null +++ b/src/main/java/net/helenus/support/CheckedRunnable.java @@ -0,0 +1,6 @@ +package net.helenus.support; + +@FunctionalInterface +public interface CheckedRunnable { + void run() throws Throwable; +} diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java index 5f9b79a..609d2e3 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java @@ -76,6 +76,10 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { Arrays.equals(q.toArray(new String[5]), new String[] {"1", "2", "3", "4", "5"})); } + private void throwAnException() throws Throwable { + throw new Exception("oops"); + } + @Test public void testExceptionWithinAndThen() throws Exception { List q = new ArrayList(5); @@ -83,6 +87,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { uow5 = session.begin(); uow4 = session.begin(uow5); + Exception ex = new Exception(); try { uow3 = session.begin(uow4); uow1 = session.begin(uow3); @@ -122,7 +127,9 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { .orElse( () -> { q.add("d"); - }); + throwAnException(); + }) + .exceptionally(e -> Assert.assertEquals(e.getMessage(), "oops")); throw new Exception(); } catch (Exception e) { uow4.abort();