diff --git a/src/main/java/net/helenus/core/CommitThunk.java b/src/main/java/net/helenus/core/CommitThunk.java new file mode 100644 index 0000000..c200061 --- /dev/null +++ b/src/main/java/net/helenus/core/CommitThunk.java @@ -0,0 +1,8 @@ +package net.helenus.core; + +import java.util.function.Function; + +@FunctionalInterface +public interface CommitThunk { + void apply(); +} diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index 9c4df6b..f3ab2f6 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -75,22 +75,22 @@ public final class HelenusSession extends AbstractSessionOperations implements C private final CacheManager cacheManager; HelenusSession( - Session session, - String usingKeyspace, - CodecRegistry registry, - boolean showCql, - PrintStream printStream, - SessionRepositoryBuilder sessionRepositoryBuilder, - Executor executor, - boolean dropSchemaOnClose, - ConsistencyLevel consistencyLevel, - MetricRegistry metricRegistry, - Tracer tracer) { + Session session, + String usingKeyspace, + CodecRegistry registry, + boolean showCql, + PrintStream printStream, + SessionRepositoryBuilder sessionRepositoryBuilder, + Executor executor, + boolean dropSchemaOnClose, + ConsistencyLevel consistencyLevel, + MetricRegistry metricRegistry, + Tracer tracer) { this.session = session; this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry; this.usingKeyspace = - Objects.requireNonNull( - usingKeyspace, "keyspace needs to be selected before creating session"); + Objects.requireNonNull( + usingKeyspace, "keyspace needs to be selected before creating session"); this.showCql = showCql; this.printStream = printStream; this.sessionRepository = sessionRepositoryBuilder.build(); @@ -181,28 +181,13 @@ public final class HelenusSession extends AbstractSessionOperations implements C } public synchronized UnitOfWork begin() { - if (currentUnitOfWork == null) { - currentUnitOfWork = new UnitOfWork(this); - return currentUnitOfWork; - } else { - return currentUnitOfWork.begin(); - } + return new UnitOfWork(this, null).begin(); } - public synchronized Function commit() throws ConflictingUnitOfWorkException { - Function f = Function.identity(); - if (currentUnitOfWork != null) { - f = Errors.rethrow().>wrap(currentUnitOfWork::commit).get(); - currentUnitOfWork = null; - } - return f; - } - - public synchronized void abort() { - if (currentUnitOfWork != null) { - currentUnitOfWork.abort(); - currentUnitOfWork = null; - } + public synchronized UnitOfWork begin(UnitOfWork parent) { + UnitOfWork child = new UnitOfWork(this, parent); + parent.addNestedUnitOfWork(child); + return child.begin(); } public SelectOperation select(Class entityClass) { diff --git a/src/main/java/net/helenus/core/PostCommitFunction.java b/src/main/java/net/helenus/core/PostCommitFunction.java new file mode 100644 index 0000000..22835c5 --- /dev/null +++ b/src/main/java/net/helenus/core/PostCommitFunction.java @@ -0,0 +1,30 @@ +package net.helenus.core; + + +import java.util.Objects; +import java.util.*; + +public class PostCommitFunction implements java.util.function.Function { + + private final UnitOfWork uow; + private final List postCommit; + + PostCommitFunction(UnitOfWork uow, List postCommit) { + this.uow = uow; + this.postCommit = postCommit; + } + + public void andThen(CommitThunk after) { + Objects.requireNonNull(after); + if (postCommit == null) { + after.apply(); + } else { + postCommit.add(after); + } + } + + @Override + public R apply(T t) { + return null; + } +} diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 15d8944..7062a1d 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -3,33 +3,24 @@ package net.helenus.core; import com.diffplug.common.base.Errors; import com.google.common.collect.TreeTraverser; +import java.io.Closeable; +import java.io.IOException; import java.util.*; -import java.util.function.Function; + /** Encapsulates the concept of a "transaction" as a unit-of-work. */ -public class UnitOfWork { - - static private final Map all = new HashMap(); - static private final List nested = new ArrayList<>(); - +public final class UnitOfWork implements Closeable { + private final List nested = new ArrayList<>(); private final HelenusSession session; - private ArrayList postCommit = new ArrayList(); + private final UnitOfWork parent; + private List postCommit = new ArrayList(); private boolean aborted = false; private boolean committed = false; - /** - * Marks the beginning of a transactional section of work. Will write a record to the shared - * write-ahead log. - * - * @return the handle used to commit or abort the work. - */ - static UnitOfWork begin(HelenusSession session) { + protected UnitOfWork(HelenusSession session, UnitOfWork parent) { Objects.requireNonNull(session, "containing session cannot be null"); - UnitOfWork uow = new UnitOfWork(session); - synchronized (all) { - all.put(session, uow); - } - return uow; + this.session = session; + this.parent = parent; } /** @@ -38,32 +29,23 @@ public class UnitOfWork { * * @return the handle used to commit or abort the work. */ - static UnitOfWork begin(UnitOfWork parent) { - Objects.requireNonNull(parent, "parent unit of work cannot be null"); - Objects.requireNonNull(all.get(parent), "parent unit of work is not currently active"); - - UnitOfWork uow = new UnitOfWork(parent.session); - synchronized (all) { - all.put(parent.session, uow); - parent.addNestedUnitOfWork(uow); - } - return uow; - } - - private UnitOfWork(HelenusSession session) { - this.session = session; + protected UnitOfWork begin() { // log.record(txn::start) + return this; } - private void addNestedUnitOfWork(UnitOfWork uow) { + protected UnitOfWork addNestedUnitOfWork(UnitOfWork uow) { synchronized (nested) { nested.add(uow); } + return this; } private void applyPostCommitFunctions() { - for(Function f : postCommit) { - f.apply(null); + if (!postCommit.isEmpty()) { + for (CommitThunk f : postCommit) { + f.apply(); + } } } @@ -81,19 +63,33 @@ public class UnitOfWork { // All nested UnitOfWork should be committed (not aborted) before calls to commit, check. boolean canCommit = true; TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes); - for (UnitOfWork uow : traverser.postOrderTraversal(this)) { canCommit &= (!uow.aborted && uow.committed); } + for (UnitOfWork uow : traverser.postOrderTraversal(this)) { + if (this != uow) { + canCommit &= (!uow.aborted && uow.committed); + } + } - traverser.postOrderTraversal(this).forEach(uow -> { uow.applyPostCommitFunctions(); }); - - nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit)); // log.record(txn::provisionalCommit) // examine log for conflicts in read-set and write-set between begin and provisional commit // if (conflict) { throw new ConflictingUnitOfWorkException(this) } // else return function so as to enable commit.andThen(() -> { do something iff commit was successful; }) - return new PostCommitFunction(this); - } + if (canCommit) { + committed = true; + aborted = false; + nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit)); + + // Apply all post-commit functions for + if (parent == null) { + traverser.postOrderTraversal(this).forEach(uow -> { + uow.applyPostCommitFunctions(); + }); + return new PostCommitFunction(this, null); + } + } + return new PostCommitFunction(this, postCommit); + } public void rollback() { abort(); @@ -101,34 +97,33 @@ public class UnitOfWork { /** Explicitly discard the work and mark it as as such in the log. */ public void abort() { + TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes); + traverser.postOrderTraversal(this).forEach(uow -> { + uow.committed = false; + uow.aborted = true; + }); // log.record(txn::abort) // cache.invalidateSince(txn::start time) - TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes); - traverser.postOrderTraversal(this).forEach(uow -> { uow.aborted = true; }); } public String describeConflicts() { return "it's complex..."; } - private class PostCommitFunction implements java.util.function.Function { - - private final UnitOfWork uow; - - PostCommitFunction(UnitOfWork uow) { - this.uow = uow; - } - - @Override - public PostCommitFunction andThen(Function after) { - Objects.requireNonNull(after); - postCommit.add(after); - return null; - } - - @Override - public R apply(T t) { - return null; + @Override + public void close() throws IOException { + // Closing a UnitOfWork will abort iff we've not already aborted or committed this unit of work. + if (aborted == false && committed == false) { + abort(); } } + + public boolean hasAborted() { + return aborted; + } + + public boolean hasCommitted() { + return committed; + } + } diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java new file mode 100644 index 0000000..9d1c065 --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/AndThenOrderTest.java @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.test.integration.core.unitofwork; + +import com.datastax.driver.core.utils.UUIDs; +import net.bytebuddy.utility.RandomString; +import net.helenus.core.Helenus; +import net.helenus.core.HelenusSession; +import net.helenus.core.UnitOfWork; +import net.helenus.mapping.annotation.Column; +import net.helenus.mapping.annotation.PartitionKey; +import net.helenus.mapping.annotation.Table; +import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + + +@Table +interface Widget { + @PartitionKey UUID id(); + @Column String name(); +} + + +public class AndThenOrderTest extends AbstractEmbeddedCassandraTest { + + static Widget widgets; + + static HelenusSession session; + + @BeforeClass + public static void beforeTest() { + session = Helenus.init(getSession()) + .showCql() + .add(Widget.class) + .autoCreateDrop() + .get(); + widgets = session.dsl(Widget.class); + } + + @Test + public void testAndThenOrdering() throws Exception { + List q = new ArrayList(5); + UnitOfWork uow1, uow2, uow3, uow4, uow5; + + uow5 = session.begin(); + uow3 = session.begin(uow5); + uow1 = session.begin(uow3); + uow1.commit().andThen(() -> { q.add("1"); }); + uow2 = session.begin(uow3); + uow2.commit().andThen(() -> { q.add("2"); }); + uow3.commit().andThen(() -> { q.add("3"); }); + uow4 = session.begin(uow5); + uow4.commit().andThen(() -> { q.add("4"); }); + uow5.commit().andThen(() -> { q.add("5"); }); + + System.out.println(q); + Assert.assertTrue(Arrays.equals(q.toArray(new String[5]), new String[] {"1", "2", "3", "4", "5"})); + + } + + @Test + public void testExceptionWithinAndThen() throws Exception { + List q = new ArrayList(5); + UnitOfWork uow1, uow2, uow3, uow4, uow5; + + uow5 = session.begin(); + uow4 = session.begin(uow5); + try { + uow3 = session.begin(uow4); + uow1 = session.begin(uow3); + uow1.commit().andThen(() -> { + q.add("1"); + }); + uow2 = session.begin(uow3); + uow2.commit().andThen(() -> { + q.add("2"); + }); + uow3.commit().andThen(() -> { + q.add("3"); + }); + uow4.commit().andThen(() -> { + q.add("4"); + }); + throw new Exception(); + } catch(Exception e) { + uow4.abort(); + } + uow5.commit().andThen(() -> { q.add("5"); }); + + System.out.println(q); + Assert.assertTrue(q.isEmpty() == true); + + } + + @Test + public void testClosableWillAbortWhenNotCommitted() throws Exception { + UnitOfWork unitOfWork; + try(UnitOfWork uow = session.begin()) { + unitOfWork = uow; + Assert.assertFalse(uow.hasAborted()); + } + Assert.assertTrue(unitOfWork.hasAborted()); + + } + + @Test + public void testClosable() throws Exception { + UnitOfWork unitOfWork; + try(UnitOfWork uow = session.begin()) { + unitOfWork = uow; + Assert.assertFalse(uow.hasAborted()); + uow.commit().andThen(() -> { + Assert.assertFalse(uow.hasAborted()); + Assert.assertTrue(uow.hasCommitted()); + }); + } + Assert.assertFalse(unitOfWork.hasAborted()); + Assert.assertTrue(unitOfWork.hasCommitted()); + } + +} diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/Directory.java b/src/test/java/net/helenus/test/integration/core/unitofwork/Directory.java new file mode 100644 index 0000000..4984d9d --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/Directory.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.test.integration.core.unitofwork; + +import net.helenus.mapping.annotation.*; + +import com.datastax.driver.core.DataType.Name; +import java.util.Set; + + +@UDT +public interface Directory extends FilesystemNode { + + @Types.Set(Name.TIMEUUID) + Set inodes(); + +} diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/File.java b/src/test/java/net/helenus/test/integration/core/unitofwork/File.java new file mode 100644 index 0000000..4e8ff6f --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/File.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.test.integration.core.unitofwork; + +import net.helenus.mapping.annotation.*; + + +@UDT +public interface File extends FilesystemNode { + + @Column + byte[] data(); + +} diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/FileAttributes.java b/src/test/java/net/helenus/test/integration/core/unitofwork/FileAttributes.java new file mode 100644 index 0000000..6d2db6d --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/FileAttributes.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.test.integration.core.unitofwork; + +import net.helenus.mapping.annotation.UDT; + +@UDT +public interface FileAttributes { + + String owner(); + +} diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/FilesystemNode.java b/src/test/java/net/helenus/test/integration/core/unitofwork/FilesystemNode.java new file mode 100644 index 0000000..a57f296 --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/FilesystemNode.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.test.integration.core.unitofwork; + +import net.helenus.mapping.annotation.*; + +import java.util.UUID; + +@Table("fs") +public interface FilesystemNode { + + @PartitionKey + UUID inode(); + + @ClusteringColumn + String name(); + + @Column + FileAttributes attr(); + +} diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/TxnConsistencyTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/TxnConsistencyTest.java new file mode 100644 index 0000000..e69de29 diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/TxnIsolationTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/TxnIsolationTest.java new file mode 100644 index 0000000..e69de29 diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java new file mode 100644 index 0000000..4c34d44 --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.test.integration.core.unitofwork; + +import net.helenus.core.Helenus; +import net.helenus.core.HelenusSession; +import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest; +import net.helenus.test.integration.core.unitofwork.FilesystemNode; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { + + static FilesystemNode node; + + static HelenusSession session; + + @BeforeClass + public static void beforeTest() { + session = Helenus.init(getSession()) + .showCql() + .add(FilesystemNode.class) + .autoCreateDrop() + .get(); + node = session.dsl(FilesystemNode.class); + } + + +/* + @Test + public void testCruid() throws Exception { + session.insert() + .value(widgets::id, UUIDs.timeBased()) + .value(widgets::name, RandomString.make(20)) + .sync(uow5); + } +*/ +}