WIP: commit.exceptionally() is working but somehow in the process I broke commit.andThen().

This commit is contained in:
Greg Burd 2017-11-14 22:37:37 -05:00
parent 33b4b35912
commit 9df97b3e44
3 changed files with 100 additions and 39 deletions

View file

@ -56,7 +56,8 @@ public abstract class AbstractUnitOfWork<E extends Exception>
protected Stopwatch elapsedTime; protected Stopwatch elapsedTime;
protected Map<String, Double> databaseTime = new HashMap<>(); protected Map<String, Double> databaseTime = new HashMap<>();
protected double cacheLookupTime = 0.0; protected double cacheLookupTime = 0.0;
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>(); private List<CommitThunk> commitThunks = new ArrayList<CommitThunk>();
private List<CommitThunk> abortThunks = new ArrayList<CommitThunk>();
private boolean aborted = false; private boolean aborted = false;
private boolean committed = false; private boolean committed = false;
private long committedAt = 0L; private long committedAt = 0L;
@ -186,14 +187,14 @@ public abstract class AbstractUnitOfWork<E extends Exception>
return s; return s;
} }
private void applyPostCommitFunctions() { private void applyPostCommitFunctions(String what, List<CommitThunk> thunks) {
if (!postCommit.isEmpty()) { if (!thunks.isEmpty()) {
for (CommitThunk f : postCommit) { for (CommitThunk f : thunks) {
f.apply(); f.apply();
} }
} }
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
LOG.info(logTimers("committed")); LOG.info(logTimers(what));
} }
} }
@ -308,11 +309,6 @@ public abstract class AbstractUnitOfWork<E extends Exception>
*/ */
public PostCommitFunction<Void, Void> commit() throws E, TimeoutException { public PostCommitFunction<Void, Void> commit() throws E, TimeoutException {
if (batch != null) {
committedAt = batch.sync(this);
//TODO(gburd) update cache with writeTime...
}
// All nested UnitOfWork should be committed (not aborted) before calls to // All nested UnitOfWork should be committed (not aborted) before calls to
// commit, check. // commit, check.
boolean canCommit = true; boolean canCommit = true;
@ -324,7 +320,28 @@ public abstract class AbstractUnitOfWork<E extends Exception>
} }
} }
if (canCommit) { if (!canCommit) {
nested.forEach((uow) -> Errors.rethrow().wrap(uow::abort));
elapsedTime.stop();
if (parent == null) {
// Apply all post-commit abort functions, this is the outter-most UnitOfWork.
traverser
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("aborted", abortThunks);
});
}
return new PostCommitFunction(this, null, null, false);
} else {
// Only the outter-most UOW batches statements for commit time, execute them.
if (batch != null) {
committedAt = batch.sync(this); //TODO(gburd): update cache with writeTime...
}
committed = true; committed = true;
aborted = false; aborted = false;
@ -332,18 +349,19 @@ public abstract class AbstractUnitOfWork<E extends Exception>
elapsedTime.stop(); elapsedTime.stop();
if (parent == null) { if (parent == null) {
// Apply all post-commit functions, this is the outter-most UnitOfWork.
// Apply all post-commit commit functions, this is the outter-most UnitOfWork.
traverser traverser
.postOrderTraversal(this) .postOrderTraversal(this)
.forEach( .forEach(
uow -> { uow -> {
uow.applyPostCommitFunctions(); applyPostCommitFunctions("committed", commitThunks);
}); });
// Merge our cache into the session cache. // Merge our cache into the session cache.
session.mergeCache(cache); session.mergeCache(cache);
return new PostCommitFunction(this, null); return new PostCommitFunction(this, null, null, true);
} else { } else {
// Merge cache and statistics into parent if there is one. // Merge cache and statistics into parent if there is one.
@ -371,7 +389,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
// Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass); // Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass);
// T object = ctor.newInstance(new Object[] { String message }); // T object = ctor.newInstance(new Object[] { String message });
// } // }
return new PostCommitFunction(this, postCommit); return new PostCommitFunction(this, commitThunks, abortThunks, true);
} }
private void addBatched(BatchOperation batch) { private void addBatched(BatchOperation batch) {
@ -384,22 +402,21 @@ public abstract class AbstractUnitOfWork<E extends Exception>
/* Explicitly discard the work and mark it as as such in the log. */ /* Explicitly discard the work and mark it as as such in the log. */
public synchronized void abort() { public synchronized void abort() {
if (!aborted) {
aborted = true;
TreeTraverser<AbstractUnitOfWork<E>> traverser = TreeTraverser<AbstractUnitOfWork<E>> traverser =
TreeTraverser.using(node -> node::getChildNodes); TreeTraverser.using(node -> node::getChildNodes);
traverser traverser
.postOrderTraversal(this) .postOrderTraversal(this)
.forEach( .forEach(
uow -> { uow -> {
uow.committed = false; applyPostCommitFunctions("aborted", uow.abortThunks);
uow.aborted = true; uow.abortThunks.clear();
}); });
// log.record(txn::abort) // log.record(txn::abort)
// cache.invalidateSince(txn::start time) // cache.invalidateSince(txn::start time)
if (LOG.isInfoEnabled()) {
if (elapsedTime.isRunning()) {
elapsedTime.stop();
}
LOG.info(logTimers("aborted"));
} }
} }

View file

@ -6,20 +6,43 @@ import java.util.Objects;
public class PostCommitFunction<T, R> implements java.util.function.Function<T, R> { public class PostCommitFunction<T, R> implements java.util.function.Function<T, R> {
private final UnitOfWork uow; private final UnitOfWork uow;
private final List<CommitThunk> postCommit; private final List<CommitThunk> commitThunks;
private final List<CommitThunk> abortThunks;
private boolean committed;
PostCommitFunction(UnitOfWork uow, List<CommitThunk> postCommit) { PostCommitFunction(
UnitOfWork uow,
List<CommitThunk> postCommit,
List<CommitThunk> abortThunks,
boolean committed) {
this.uow = uow; this.uow = uow;
this.postCommit = postCommit; this.commitThunks = postCommit;
this.abortThunks = abortThunks;
this.committed = committed;
} }
public void andThen(CommitThunk after) { public PostCommitFunction<T, R> andThen(CommitThunk after) {
Objects.requireNonNull(after); Objects.requireNonNull(after);
if (postCommit == null) { if (commitThunks == null) {
if (committed) {
after.apply(); after.apply();
} else {
postCommit.add(after);
} }
} else {
commitThunks.add(after);
}
return this;
}
public PostCommitFunction<T, R> exceptionally(CommitThunk after) {
Objects.requireNonNull(after);
if (abortThunks == null) {
if (!committed) {
after.apply();
}
} else {
abortThunks.add(after);
}
return this;
} }
@Override @Override

View file

@ -90,22 +90,38 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest {
.andThen( .andThen(
() -> { () -> {
q.add("1"); q.add("1");
})
.exceptionally(
() -> {
q.add("a");
}); });
uow2 = session.begin(uow3); uow2 = session.begin(uow3);
uow2.commit() uow2.commit()
.andThen( .andThen(
() -> { () -> {
q.add("2"); q.add("2");
})
.exceptionally(
() -> {
q.add("b");
}); });
uow3.commit() uow3.commit()
.andThen( .andThen(
() -> { () -> {
q.add("3"); q.add("3");
})
.exceptionally(
() -> {
q.add("c");
}); });
uow4.commit() uow4.commit()
.andThen( .andThen(
() -> { () -> {
q.add("4"); q.add("4");
})
.exceptionally(
() -> {
q.add("d");
}); });
throw new Exception(); throw new Exception();
} catch (Exception e) { } catch (Exception e) {
@ -115,10 +131,15 @@ public class AndThenOrderTest extends AbstractEmbeddedCassandraTest {
.andThen( .andThen(
() -> { () -> {
q.add("5"); q.add("5");
})
.exceptionally(
() -> {
q.add("e");
}); });
System.out.println(q); System.out.println(q);
Assert.assertTrue(q.isEmpty() == true); Assert.assertTrue(
Arrays.equals(q.toArray(new String[5]), new String[] {"a", "b", "c", "d", "e"}));
} }
@Test @Test