diff --git a/src/main/java/net/helenus/core/CommitThunk.java b/src/main/java/net/helenus/core/CommitThunk.java deleted file mode 100644 index 1ad9e05..0000000 --- a/src/main/java/net/helenus/core/CommitThunk.java +++ /dev/null @@ -1,6 +0,0 @@ -package net.helenus.core; - -@FunctionalInterface -public interface CommitThunk { - void apply(); -} diff --git a/src/main/java/net/helenus/core/PostCommitFunction.java b/src/main/java/net/helenus/core/PostCommitFunction.java index a7e152c..c2f7a8f 100644 --- a/src/main/java/net/helenus/core/PostCommitFunction.java +++ b/src/main/java/net/helenus/core/PostCommitFunction.java @@ -2,48 +2,75 @@ package net.helenus.core; import java.util.List; import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Function; + +import net.helenus.support.CheckedRunnable; public class PostCommitFunction implements java.util.function.Function { - public static final PostCommitFunction NULL_ABORT = new PostCommitFunction<>(null, null, false); - public static final PostCommitFunction NULL_COMMIT = new PostCommitFunction<>(null, null, true); + public static final PostCommitFunction NULL_ABORT = new PostCommitFunction(null, null, null, false); + public static final PostCommitFunction NULL_COMMIT = new PostCommitFunction(null, null, null, true); - private final List commitThunks; - private final List abortThunks; + private final List commitThunks; + private final List abortThunks; + private Consumer exceptionallyThunk; private boolean committed; - PostCommitFunction( - List postCommit, - List abortThunks, + PostCommitFunction(List postCommit, List abortThunks, + Consumer exceptionallyThunk, boolean committed) { this.commitThunks = postCommit; this.abortThunks = abortThunks; + this.exceptionallyThunk = exceptionallyThunk; this.committed = committed; } - public PostCommitFunction andThen(CommitThunk after) { + private void apply(CheckedRunnable... fns) { + try { + for (CheckedRunnable fn : fns) { + fn.run(); + } + } catch (Throwable t) { + if (exceptionallyThunk != null) { + exceptionallyThunk.accept(t); + } + } + } + + public PostCommitFunction andThen(CheckedRunnable... after) { Objects.requireNonNull(after); if (commitThunks == null) { if (committed) { - after.apply(); + apply(after); } } else { - commitThunks.add(after); + for (CheckedRunnable fn : after) { + commitThunks.add(fn); + } } return this; } - public PostCommitFunction orElse(CommitThunk after) { + public PostCommitFunction orElse(CheckedRunnable... after) { Objects.requireNonNull(after); if (abortThunks == null) { if (!committed) { - after.apply(); + apply(after); } } else { - abortThunks.add(after); + for (CheckedRunnable fn : after) { + abortThunks.add(fn); + } } return this; } + public PostCommitFunction exceptionally(Consumer fn) { + Objects.requireNonNull(fn); + exceptionallyThunk = fn; + return this; + } + @Override public R apply(T t) { return null; diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index dfcdf37..7581298 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.stream.Collectors; import javax.cache.Cache; import javax.cache.CacheManager; @@ -54,6 +55,7 @@ import net.helenus.core.cache.MapCache; import net.helenus.core.operation.AbstractOperation; import net.helenus.core.operation.BatchOperation; import net.helenus.mapping.MappingUtil; +import net.helenus.support.CheckedRunnable; import net.helenus.support.Either; import net.helenus.support.HelenusException; import org.apache.commons.lang3.SerializationUtils; @@ -78,9 +80,10 @@ public class UnitOfWork implements AutoCloseable { protected int databaseLookups = 0; protected final Stopwatch elapsedTime; protected Map databaseTime = new HashMap<>(); - protected double cacheLookupTimeMSecs = 0.0d; - private List commitThunks = new ArrayList(); - private List abortThunks = new ArrayList(); + protected double cacheLookupTimeMSecs = 0.0; + private List commitThunks = new ArrayList<>(); + private List abortThunks = new ArrayList<>(); + private Consumer exceptionallyThunk; private List> asyncOperationFutures = new ArrayList>(); private boolean aborted = false; private boolean committed = false; @@ -243,10 +246,16 @@ public class UnitOfWork implements AutoCloseable { return s; } - private void applyPostCommitFunctions(String what, List thunks) { + private void applyPostCommitFunctions(String what, List thunks, Consumer exceptionallyThunk) { if (!thunks.isEmpty()) { - for (CommitThunk f : thunks) { - f.apply(); + for (CheckedRunnable f : thunks) { + try { + f.run(); + } catch (Throwable t) { + if (exceptionallyThunk != null) { + exceptionallyThunk.accept(t); + } + } } } } @@ -361,8 +370,7 @@ public class UnitOfWork implements AutoCloseable { * @return a function from which to chain work that only happens when commit is successful * @throws HelenusException when the work overlaps with other concurrent writers. */ - public synchronized PostCommitFunction commit() - throws HelenusException, TimeoutException { + public synchronized PostCommitFunction commit() throws HelenusException { if (isDone()) { return PostCommitFunction.NULL_ABORT; @@ -392,7 +400,7 @@ public class UnitOfWork implements AutoCloseable { .postOrderTraversal(this) .forEach( uow -> { - applyPostCommitFunctions("aborted", abortThunks); + applyPostCommitFunctions("aborted", abortThunks, exceptionallyThunk); }); elapsedTime.stop(); @@ -413,7 +421,7 @@ public class UnitOfWork implements AutoCloseable { .postOrderTraversal(this) .forEach( uow -> { - applyPostCommitFunctions("committed", uow.commitThunks); + applyPostCommitFunctions("committed", uow.commitThunks, exceptionallyThunk); }); // Merge our statement cache into the session cache if it exists. @@ -485,7 +493,7 @@ public class UnitOfWork implements AutoCloseable { // Constructor ctor = clazz.getConstructor(conflictExceptionClass); // T object = ctor.newInstance(new Object[] { String message }); // } - return new PostCommitFunction(commitThunks, abortThunks, true); + return new PostCommitFunction(commitThunks, abortThunks, exceptionallyThunk, true); } private void addBatched(BatchOperation batchArg) { @@ -518,7 +526,7 @@ public class UnitOfWork implements AutoCloseable { .postOrderTraversal(this) .forEach( uow -> { - applyPostCommitFunctions("aborted", uow.abortThunks); + applyPostCommitFunctions("aborted", uow.abortThunks, exceptionallyThunk); uow.abortThunks.clear(); }); diff --git a/src/main/java/net/helenus/core/operation/BatchOperation.java b/src/main/java/net/helenus/core/operation/BatchOperation.java index 8aca9cf..b996de3 100644 --- a/src/main/java/net/helenus/core/operation/BatchOperation.java +++ b/src/main/java/net/helenus/core/operation/BatchOperation.java @@ -65,38 +65,42 @@ public class BatchOperation extends Operation { return this; } - public Long sync() throws TimeoutException { + public Long sync() { if (operations.size() == 0) return 0L; final Timer.Context context = requestLatency.time(); try { - batch.setDefaultTimestamp(timestampGenerator.next()); - ResultSet resultSet = - this.execute( - sessionOps, null, queryExecutionTimeout, queryTimeoutUnits, showValues, false); - if (!resultSet.wasApplied()) { - throw new HelenusException("Failed to apply batch."); - } + batch.setDefaultTimestamp(timestampGenerator.next()); + ResultSet resultSet = + this.execute( + sessionOps, null, queryExecutionTimeout, queryTimeoutUnits, showValues, false); + if (!resultSet.wasApplied()) { + throw new HelenusException("Failed to apply batch."); + } + } catch (TimeoutException e) { + throw new HelenusException(e); } finally { context.stop(); } return batch.getDefaultTimestamp(); } - public Long sync(UnitOfWork uow) throws TimeoutException { + public Long sync(UnitOfWork uow) { if (operations.size() == 0) return 0L; if (uow == null) return sync(); final Timer.Context context = requestLatency.time(); final Stopwatch timer = Stopwatch.createStarted(); try { - uow.recordCacheAndDatabaseOperationCount(0, 1); - batch.setDefaultTimestamp(timestampGenerator.next()); - ResultSet resultSet = - this.execute( - sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, false); - if (!resultSet.wasApplied()) { - throw new HelenusException("Failed to apply batch."); - } + uow.recordCacheAndDatabaseOperationCount(0, 1); + batch.setDefaultTimestamp(timestampGenerator.next()); + ResultSet resultSet = + this.execute( + sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, false); + if (!resultSet.wasApplied()) { + throw new HelenusException("Failed to apply batch."); + } + } catch (TimeoutException e) { + throw new HelenusException(e); } finally { context.stop(); timer.stop(); diff --git a/src/main/java/net/helenus/support/CheckedRunnable.java b/src/main/java/net/helenus/support/CheckedRunnable.java new file mode 100644 index 0000000..4d70627 --- /dev/null +++ b/src/main/java/net/helenus/support/CheckedRunnable.java @@ -0,0 +1,6 @@ +package net.helenus.support; + +@FunctionalInterface +public interface CheckedRunnable { + void run() throws Throwable; +} diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java index 5f9b79a..609d2e3 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java @@ -76,6 +76,10 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { Arrays.equals(q.toArray(new String[5]), new String[] {"1", "2", "3", "4", "5"})); } + private void throwAnException() throws Throwable { + throw new Exception("oops"); + } + @Test public void testExceptionWithinAndThen() throws Exception { List q = new ArrayList(5); @@ -83,6 +87,7 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { uow5 = session.begin(); uow4 = session.begin(uow5); + Exception ex = new Exception(); try { uow3 = session.begin(uow4); uow1 = session.begin(uow3); @@ -122,7 +127,9 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { .orElse( () -> { q.add("d"); - }); + throwAnException(); + }) + .exceptionally(e -> Assert.assertEquals(e.getMessage(), "oops")); throw new Exception(); } catch (Exception e) { uow4.abort();