From 9df97b3e447580cc8db7900d3a760f56c294677d Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Tue, 14 Nov 2017 22:37:37 -0500 Subject: [PATCH] WIP: commit.exceptionally() is working but somehow in the process I broke commit.andThen(). --- .../net/helenus/core/AbstractUnitOfWork.java | 79 +++++++++++-------- .../net/helenus/core/PostCommitFunction.java | 37 +++++++-- .../core/unitofwork/AndThenOrderTest.java | 23 +++++- 3 files changed, 100 insertions(+), 39 deletions(-) diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java index b9f5b27..7f5e6f1 100644 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -56,7 +56,8 @@ public abstract class AbstractUnitOfWork protected Stopwatch elapsedTime; protected Map databaseTime = new HashMap<>(); protected double cacheLookupTime = 0.0; - private List postCommit = new ArrayList(); + private List commitThunks = new ArrayList(); + private List abortThunks = new ArrayList(); private boolean aborted = false; private boolean committed = false; private long committedAt = 0L; @@ -186,14 +187,14 @@ public abstract class AbstractUnitOfWork return s; } - private void applyPostCommitFunctions() { - if (!postCommit.isEmpty()) { - for (CommitThunk f : postCommit) { + private void applyPostCommitFunctions(String what, List thunks) { + if (!thunks.isEmpty()) { + for (CommitThunk f : thunks) { f.apply(); } } if (LOG.isInfoEnabled()) { - LOG.info(logTimers("committed")); + LOG.info(logTimers(what)); } } @@ -308,11 +309,6 @@ public abstract class AbstractUnitOfWork */ public PostCommitFunction commit() throws E, TimeoutException { - if (batch != null) { - committedAt = batch.sync(this); - //TODO(gburd) update cache with writeTime... - } - // All nested UnitOfWork should be committed (not aborted) before calls to // commit, check. boolean canCommit = true; @@ -324,7 +320,28 @@ public abstract class AbstractUnitOfWork } } - if (canCommit) { + if (!canCommit) { + nested.forEach((uow) -> Errors.rethrow().wrap(uow::abort)); + elapsedTime.stop(); + + if (parent == null) { + + // Apply all post-commit abort functions, this is the outter-most UnitOfWork. + traverser + .postOrderTraversal(this) + .forEach( + uow -> { + applyPostCommitFunctions("aborted", abortThunks); + }); + } + + return new PostCommitFunction(this, null, null, false); + } else { + // Only the outter-most UOW batches statements for commit time, execute them. + if (batch != null) { + committedAt = batch.sync(this); //TODO(gburd): update cache with writeTime... + } + committed = true; aborted = false; @@ -332,18 +349,19 @@ public abstract class AbstractUnitOfWork elapsedTime.stop(); if (parent == null) { - // Apply all post-commit functions, this is the outter-most UnitOfWork. + + // Apply all post-commit commit functions, this is the outter-most UnitOfWork. traverser .postOrderTraversal(this) .forEach( uow -> { - uow.applyPostCommitFunctions(); + applyPostCommitFunctions("committed", commitThunks); }); // Merge our cache into the session cache. session.mergeCache(cache); - return new PostCommitFunction(this, null); + return new PostCommitFunction(this, null, null, true); } else { // Merge cache and statistics into parent if there is one. @@ -371,7 +389,7 @@ public abstract class AbstractUnitOfWork // Constructor ctor = clazz.getConstructor(conflictExceptionClass); // T object = ctor.newInstance(new Object[] { String message }); // } - return new PostCommitFunction(this, postCommit); + return new PostCommitFunction(this, commitThunks, abortThunks, true); } private void addBatched(BatchOperation batch) { @@ -384,22 +402,21 @@ public abstract class AbstractUnitOfWork /* Explicitly discard the work and mark it as as such in the log. */ public synchronized void abort() { - TreeTraverser> traverser = - TreeTraverser.using(node -> node::getChildNodes); - traverser - .postOrderTraversal(this) - .forEach( - uow -> { - uow.committed = false; - uow.aborted = true; - }); - // log.record(txn::abort) - // cache.invalidateSince(txn::start time) - if (LOG.isInfoEnabled()) { - if (elapsedTime.isRunning()) { - elapsedTime.stop(); - } - LOG.info(logTimers("aborted")); + if (!aborted) { + aborted = true; + + TreeTraverser> traverser = + TreeTraverser.using(node -> node::getChildNodes); + traverser + .postOrderTraversal(this) + .forEach( + uow -> { + applyPostCommitFunctions("aborted", uow.abortThunks); + uow.abortThunks.clear(); + }); + + // log.record(txn::abort) + // cache.invalidateSince(txn::start time) } } diff --git a/src/main/java/net/helenus/core/PostCommitFunction.java b/src/main/java/net/helenus/core/PostCommitFunction.java index 5521304..c1be72d 100644 --- a/src/main/java/net/helenus/core/PostCommitFunction.java +++ b/src/main/java/net/helenus/core/PostCommitFunction.java @@ -6,20 +6,43 @@ import java.util.Objects; public class PostCommitFunction implements java.util.function.Function { private final UnitOfWork uow; - private final List postCommit; + private final List commitThunks; + private final List abortThunks; + private boolean committed; - PostCommitFunction(UnitOfWork uow, List postCommit) { + PostCommitFunction( + UnitOfWork uow, + List postCommit, + List abortThunks, + boolean committed) { this.uow = uow; - this.postCommit = postCommit; + this.commitThunks = postCommit; + this.abortThunks = abortThunks; + this.committed = committed; } - public void andThen(CommitThunk after) { + public PostCommitFunction andThen(CommitThunk after) { Objects.requireNonNull(after); - if (postCommit == null) { - after.apply(); + if (commitThunks == null) { + if (committed) { + after.apply(); + } } else { - postCommit.add(after); + commitThunks.add(after); } + return this; + } + + public PostCommitFunction exceptionally(CommitThunk after) { + Objects.requireNonNull(after); + if (abortThunks == null) { + if (!committed) { + after.apply(); + } + } else { + abortThunks.add(after); + } + return this; } @Override diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java index 2b3ebcd..4872e41 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java @@ -90,22 +90,38 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { .andThen( () -> { q.add("1"); + }) + .exceptionally( + () -> { + q.add("a"); }); uow2 = session.begin(uow3); uow2.commit() .andThen( () -> { q.add("2"); + }) + .exceptionally( + () -> { + q.add("b"); }); uow3.commit() .andThen( () -> { q.add("3"); + }) + .exceptionally( + () -> { + q.add("c"); }); uow4.commit() .andThen( () -> { q.add("4"); + }) + .exceptionally( + () -> { + q.add("d"); }); throw new Exception(); } catch (Exception e) { @@ -115,10 +131,15 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { .andThen( () -> { q.add("5"); + }) + .exceptionally( + () -> { + q.add("e"); }); System.out.println(q); - Assert.assertTrue(q.isEmpty() == true); + Assert.assertTrue( + Arrays.equals(q.toArray(new String[5]), new String[] {"a", "b", "c", "d", "e"})); } @Test