From 09b06f4ca4e74a8e4fd0b11ba70d7ce3cd83f152 Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Tue, 29 Aug 2017 10:13:29 -0400 Subject: [PATCH] WIP: Fixed the ordering of commit.andThen(() -> {}) to match the nesting and fire iff the outter most UOW has committed. Made UnitOfWork Closable which will abort the work iff it has not previously been committed or aborted. Started to add tests to exercise Helenus UnitOfWork (transactions). --- .../java/net/helenus/core/CommitThunk.java | 8 + .../java/net/helenus/core/HelenusSession.java | 51 +++---- .../net/helenus/core/PostCommitFunction.java | 30 ++++ .../java/net/helenus/core/UnitOfWork.java | 119 +++++++-------- .../core/unitofwork/AndThenOrderTest.java | 141 ++++++++++++++++++ .../core/unitofwork/Directory.java | 30 ++++ .../integration/core/unitofwork/File.java | 27 ++++ .../core/unitofwork/FileAttributes.java | 25 ++++ .../core/unitofwork/FilesystemNode.java | 34 +++++ .../core/unitofwork/TxnConsistencyTest.java | 0 .../core/unitofwork/TxnIsolationTest.java | 0 .../core/unitofwork/UnitOfWorkTest.java | 52 +++++++ 12 files changed, 422 insertions(+), 95 deletions(-) create mode 100644 src/main/java/net/helenus/core/CommitThunk.java create mode 100644 src/main/java/net/helenus/core/PostCommitFunction.java create mode 100644 src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java create mode 100644 src/test/java/net/helenus/test/integration/core/unitofwork/Directory.java create mode 100644 src/test/java/net/helenus/test/integration/core/unitofwork/File.java create mode 100644 src/test/java/net/helenus/test/integration/core/unitofwork/FileAttributes.java create mode 100644 src/test/java/net/helenus/test/integration/core/unitofwork/FilesystemNode.java create mode 100644 src/test/java/net/helenus/test/integration/core/unitofwork/TxnConsistencyTest.java create mode 100644 src/test/java/net/helenus/test/integration/core/unitofwork/TxnIsolationTest.java create mode 100644 src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java diff --git a/src/main/java/net/helenus/core/CommitThunk.java b/src/main/java/net/helenus/core/CommitThunk.java new file mode 100644 index 0000000..c200061 --- /dev/null +++ b/src/main/java/net/helenus/core/CommitThunk.java @@ -0,0 +1,8 @@ +package net.helenus.core; + +import java.util.function.Function; + +@FunctionalInterface +public interface CommitThunk { + void apply(); +} diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index 9c4df6b..f3ab2f6 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -75,22 +75,22 @@ public final class HelenusSession extends AbstractSessionOperations implements C private final CacheManager cacheManager; HelenusSession( - Session session, - String usingKeyspace, - CodecRegistry registry, - boolean showCql, - PrintStream printStream, - SessionRepositoryBuilder sessionRepositoryBuilder, - Executor executor, - boolean dropSchemaOnClose, - ConsistencyLevel consistencyLevel, - MetricRegistry metricRegistry, - Tracer tracer) { + Session session, + String usingKeyspace, + CodecRegistry registry, + boolean showCql, + PrintStream printStream, + SessionRepositoryBuilder sessionRepositoryBuilder, + Executor executor, + boolean dropSchemaOnClose, + ConsistencyLevel consistencyLevel, + MetricRegistry metricRegistry, + Tracer tracer) { this.session = session; this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry; this.usingKeyspace = - Objects.requireNonNull( - usingKeyspace, "keyspace needs to be selected before creating session"); + Objects.requireNonNull( + usingKeyspace, "keyspace needs to be selected before creating session"); this.showCql = showCql; this.printStream = printStream; this.sessionRepository = sessionRepositoryBuilder.build(); @@ -181,28 +181,13 @@ public final class HelenusSession extends AbstractSessionOperations implements C } public synchronized UnitOfWork begin() { - if (currentUnitOfWork == null) { - currentUnitOfWork = new UnitOfWork(this); - return currentUnitOfWork; - } else { - return currentUnitOfWork.begin(); - } + return new UnitOfWork(this, null).begin(); } - public synchronized Function commit() throws ConflictingUnitOfWorkException { - Function f = Function.identity(); - if (currentUnitOfWork != null) { - f = Errors.rethrow().>wrap(currentUnitOfWork::commit).get(); - currentUnitOfWork = null; - } - return f; - } - - public synchronized void abort() { - if (currentUnitOfWork != null) { - currentUnitOfWork.abort(); - currentUnitOfWork = null; - } + public synchronized UnitOfWork begin(UnitOfWork parent) { + UnitOfWork child = new UnitOfWork(this, parent); + parent.addNestedUnitOfWork(child); + return child.begin(); } public SelectOperation select(Class entityClass) { diff --git a/src/main/java/net/helenus/core/PostCommitFunction.java b/src/main/java/net/helenus/core/PostCommitFunction.java new file mode 100644 index 0000000..22835c5 --- /dev/null +++ b/src/main/java/net/helenus/core/PostCommitFunction.java @@ -0,0 +1,30 @@ +package net.helenus.core; + + +import java.util.Objects; +import java.util.*; + +public class PostCommitFunction implements java.util.function.Function { + + private final UnitOfWork uow; + private final List postCommit; + + PostCommitFunction(UnitOfWork uow, List postCommit) { + this.uow = uow; + this.postCommit = postCommit; + } + + public void andThen(CommitThunk after) { + Objects.requireNonNull(after); + if (postCommit == null) { + after.apply(); + } else { + postCommit.add(after); + } + } + + @Override + public R apply(T t) { + return null; + } +} diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 15d8944..7062a1d 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -3,33 +3,24 @@ package net.helenus.core; import com.diffplug.common.base.Errors; import com.google.common.collect.TreeTraverser; +import java.io.Closeable; +import java.io.IOException; import java.util.*; -import java.util.function.Function; + /** Encapsulates the concept of a "transaction" as a unit-of-work. */ -public class UnitOfWork { - - static private final Map all = new HashMap(); - static private final List nested = new ArrayList<>(); - +public final class UnitOfWork implements Closeable { + private final List nested = new ArrayList<>(); private final HelenusSession session; - private ArrayList postCommit = new ArrayList(); + private final UnitOfWork parent; + private List postCommit = new ArrayList(); private boolean aborted = false; private boolean committed = false; - /** - * Marks the beginning of a transactional section of work. Will write a record to the shared - * write-ahead log. - * - * @return the handle used to commit or abort the work. - */ - static UnitOfWork begin(HelenusSession session) { + protected UnitOfWork(HelenusSession session, UnitOfWork parent) { Objects.requireNonNull(session, "containing session cannot be null"); - UnitOfWork uow = new UnitOfWork(session); - synchronized (all) { - all.put(session, uow); - } - return uow; + this.session = session; + this.parent = parent; } /** @@ -38,32 +29,23 @@ public class UnitOfWork { * * @return the handle used to commit or abort the work. */ - static UnitOfWork begin(UnitOfWork parent) { - Objects.requireNonNull(parent, "parent unit of work cannot be null"); - Objects.requireNonNull(all.get(parent), "parent unit of work is not currently active"); - - UnitOfWork uow = new UnitOfWork(parent.session); - synchronized (all) { - all.put(parent.session, uow); - parent.addNestedUnitOfWork(uow); - } - return uow; - } - - private UnitOfWork(HelenusSession session) { - this.session = session; + protected UnitOfWork begin() { // log.record(txn::start) + return this; } - private void addNestedUnitOfWork(UnitOfWork uow) { + protected UnitOfWork addNestedUnitOfWork(UnitOfWork uow) { synchronized (nested) { nested.add(uow); } + return this; } private void applyPostCommitFunctions() { - for(Function f : postCommit) { - f.apply(null); + if (!postCommit.isEmpty()) { + for (CommitThunk f : postCommit) { + f.apply(); + } } } @@ -81,19 +63,33 @@ public class UnitOfWork { // All nested UnitOfWork should be committed (not aborted) before calls to commit, check. boolean canCommit = true; TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes); - for (UnitOfWork uow : traverser.postOrderTraversal(this)) { canCommit &= (!uow.aborted && uow.committed); } + for (UnitOfWork uow : traverser.postOrderTraversal(this)) { + if (this != uow) { + canCommit &= (!uow.aborted && uow.committed); + } + } - traverser.postOrderTraversal(this).forEach(uow -> { uow.applyPostCommitFunctions(); }); - - nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit)); // log.record(txn::provisionalCommit) // examine log for conflicts in read-set and write-set between begin and provisional commit // if (conflict) { throw new ConflictingUnitOfWorkException(this) } // else return function so as to enable commit.andThen(() -> { do something iff commit was successful; }) - return new PostCommitFunction(this); - } + if (canCommit) { + committed = true; + aborted = false; + nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit)); + + // Apply all post-commit functions for + if (parent == null) { + traverser.postOrderTraversal(this).forEach(uow -> { + uow.applyPostCommitFunctions(); + }); + return new PostCommitFunction(this, null); + } + } + return new PostCommitFunction(this, postCommit); + } public void rollback() { abort(); @@ -101,34 +97,33 @@ public class UnitOfWork { /** Explicitly discard the work and mark it as as such in the log. */ public void abort() { + TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes); + traverser.postOrderTraversal(this).forEach(uow -> { + uow.committed = false; + uow.aborted = true; + }); // log.record(txn::abort) // cache.invalidateSince(txn::start time) - TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes); - traverser.postOrderTraversal(this).forEach(uow -> { uow.aborted = true; }); } public String describeConflicts() { return "it's complex..."; } - private class PostCommitFunction implements java.util.function.Function { - - private final UnitOfWork uow; - - PostCommitFunction(UnitOfWork uow) { - this.uow = uow; - } - - @Override - public PostCommitFunction andThen(Function after) { - Objects.requireNonNull(after); - postCommit.add(after); - return null; - } - - @Override - public R apply(T t) { - return null; + @Override + public void close() throws IOException { + // Closing a UnitOfWork will abort iff we've not already aborted or committed this unit of work. + if (aborted == false && committed == false) { + abort(); } } + + public boolean hasAborted() { + return aborted; + } + + public boolean hasCommitted() { + return committed; + } + } diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java new file mode 100644 index 0000000..9d1c065 --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.test.integration.core.unitofwork; + +import com.datastax.driver.core.utils.UUIDs; +import net.bytebuddy.utility.RandomString; +import net.helenus.core.Helenus; +import net.helenus.core.HelenusSession; +import net.helenus.core.UnitOfWork; +import net.helenus.mapping.annotation.Column; +import net.helenus.mapping.annotation.PartitionKey; +import net.helenus.mapping.annotation.Table; +import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + + +@Table +interface Widget { + @PartitionKey UUID id(); + @Column String name(); +} + + +public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { + + static Widget widgets; + + static HelenusSession session; + + @BeforeClass + public static void beforeTest() { + session = Helenus.init(getSession()) + .showCql() + .add(Widget.class) + .autoCreateDrop() + .get(); + widgets = session.dsl(Widget.class); + } + + @Test + public void testAndThenOrdering() throws Exception { + List q = new ArrayList(5); + UnitOfWork uow1, uow2, uow3, uow4, uow5; + + uow5 = session.begin(); + uow3 = session.begin(uow5); + uow1 = session.begin(uow3); + uow1.commit().andThen(() -> { q.add("1"); }); + uow2 = session.begin(uow3); + uow2.commit().andThen(() -> { q.add("2"); }); + uow3.commit().andThen(() -> { q.add("3"); }); + uow4 = session.begin(uow5); + uow4.commit().andThen(() -> { q.add("4"); }); + uow5.commit().andThen(() -> { q.add("5"); }); + + System.out.println(q); + Assert.assertTrue(Arrays.equals(q.toArray(new String[5]), new String[] {"1", "2", "3", "4", "5"})); + + } + + @Test + public void testExceptionWithinAndThen() throws Exception { + List q = new ArrayList(5); + UnitOfWork uow1, uow2, uow3, uow4, uow5; + + uow5 = session.begin(); + uow4 = session.begin(uow5); + try { + uow3 = session.begin(uow4); + uow1 = session.begin(uow3); + uow1.commit().andThen(() -> { + q.add("1"); + }); + uow2 = session.begin(uow3); + uow2.commit().andThen(() -> { + q.add("2"); + }); + uow3.commit().andThen(() -> { + q.add("3"); + }); + uow4.commit().andThen(() -> { + q.add("4"); + }); + throw new Exception(); + } catch(Exception e) { + uow4.abort(); + } + uow5.commit().andThen(() -> { q.add("5"); }); + + System.out.println(q); + Assert.assertTrue(q.isEmpty() == true); + + } + + @Test + public void testClosableWillAbortWhenNotCommitted() throws Exception { + UnitOfWork unitOfWork; + try(UnitOfWork uow = session.begin()) { + unitOfWork = uow; + Assert.assertFalse(uow.hasAborted()); + } + Assert.assertTrue(unitOfWork.hasAborted()); + + } + + @Test + public void testClosable() throws Exception { + UnitOfWork unitOfWork; + try(UnitOfWork uow = session.begin()) { + unitOfWork = uow; + Assert.assertFalse(uow.hasAborted()); + uow.commit().andThen(() -> { + Assert.assertFalse(uow.hasAborted()); + Assert.assertTrue(uow.hasCommitted()); + }); + } + Assert.assertFalse(unitOfWork.hasAborted()); + Assert.assertTrue(unitOfWork.hasCommitted()); + } + +} diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/Directory.java b/src/test/java/net/helenus/test/integration/core/unitofwork/Directory.java new file mode 100644 index 0000000..4984d9d --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/Directory.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.test.integration.core.unitofwork; + +import net.helenus.mapping.annotation.*; + +import com.datastax.driver.core.DataType.Name; +import java.util.Set; + + +@UDT +public interface Directory extends FilesystemNode { + + @Types.Set(Name.TIMEUUID) + Set inodes(); + +} diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/File.java b/src/test/java/net/helenus/test/integration/core/unitofwork/File.java new file mode 100644 index 0000000..4e8ff6f --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/File.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.test.integration.core.unitofwork; + +import net.helenus.mapping.annotation.*; + + +@UDT +public interface File extends FilesystemNode { + + @Column + byte[] data(); + +} diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/FileAttributes.java b/src/test/java/net/helenus/test/integration/core/unitofwork/FileAttributes.java new file mode 100644 index 0000000..6d2db6d --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/FileAttributes.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.test.integration.core.unitofwork; + +import net.helenus.mapping.annotation.UDT; + +@UDT +public interface FileAttributes { + + String owner(); + +} diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/FilesystemNode.java b/src/test/java/net/helenus/test/integration/core/unitofwork/FilesystemNode.java new file mode 100644 index 0000000..a57f296 --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/FilesystemNode.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.test.integration.core.unitofwork; + +import net.helenus.mapping.annotation.*; + +import java.util.UUID; + +@Table("fs") +public interface FilesystemNode { + + @PartitionKey + UUID inode(); + + @ClusteringColumn + String name(); + + @Column + FileAttributes attr(); + +} diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/TxnConsistencyTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/TxnConsistencyTest.java new file mode 100644 index 0000000..e69de29 diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/TxnIsolationTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/TxnIsolationTest.java new file mode 100644 index 0000000..e69de29 diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java new file mode 100644 index 0000000..4c34d44 --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.test.integration.core.unitofwork; + +import net.helenus.core.Helenus; +import net.helenus.core.HelenusSession; +import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest; +import net.helenus.test.integration.core.unitofwork.FilesystemNode; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { + + static FilesystemNode node; + + static HelenusSession session; + + @BeforeClass + public static void beforeTest() { + session = Helenus.init(getSession()) + .showCql() + .add(FilesystemNode.class) + .autoCreateDrop() + .get(); + node = session.dsl(FilesystemNode.class); + } + + +/* + @Test + public void testCruid() throws Exception { + session.insert() + .value(widgets::id, UUIDs.timeBased()) + .value(widgets::name, RandomString.make(20)) + .sync(uow5); + } +*/ +}