WIP: Fixed the ordering of commit.andThen(() -> {}) to match the nesting and fire iff the outter most UOW has committed. Made UnitOfWork Closable which will abort the work iff it has not previously been committed or aborted. Started to add tests to exercise Helenus UnitOfWork (transactions).

This commit is contained in:
Greg Burd 2017-08-29 10:13:29 -04:00
parent c35d6d19d1
commit 09b06f4ca4
12 changed files with 422 additions and 95 deletions

View file

@ -0,0 +1,8 @@
package net.helenus.core;
import java.util.function.Function;
@FunctionalInterface
public interface CommitThunk {
void apply();
}

View file

@ -75,22 +75,22 @@ public final class HelenusSession extends AbstractSessionOperations implements C
private final CacheManager cacheManager;
HelenusSession(
Session session,
String usingKeyspace,
CodecRegistry registry,
boolean showCql,
PrintStream printStream,
SessionRepositoryBuilder sessionRepositoryBuilder,
Executor executor,
boolean dropSchemaOnClose,
ConsistencyLevel consistencyLevel,
MetricRegistry metricRegistry,
Tracer tracer) {
Session session,
String usingKeyspace,
CodecRegistry registry,
boolean showCql,
PrintStream printStream,
SessionRepositoryBuilder sessionRepositoryBuilder,
Executor executor,
boolean dropSchemaOnClose,
ConsistencyLevel consistencyLevel,
MetricRegistry metricRegistry,
Tracer tracer) {
this.session = session;
this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry;
this.usingKeyspace =
Objects.requireNonNull(
usingKeyspace, "keyspace needs to be selected before creating session");
Objects.requireNonNull(
usingKeyspace, "keyspace needs to be selected before creating session");
this.showCql = showCql;
this.printStream = printStream;
this.sessionRepository = sessionRepositoryBuilder.build();
@ -181,28 +181,13 @@ public final class HelenusSession extends AbstractSessionOperations implements C
}
public synchronized UnitOfWork begin() {
if (currentUnitOfWork == null) {
currentUnitOfWork = new UnitOfWork(this);
return currentUnitOfWork;
} else {
return currentUnitOfWork.begin();
}
return new UnitOfWork(this, null).begin();
}
public synchronized Function<Void, Void> commit() throws ConflictingUnitOfWorkException {
Function<Void, Void> f = Function.<Void>identity();
if (currentUnitOfWork != null) {
f = Errors.rethrow().<Function<Void, Void>>wrap(currentUnitOfWork::commit).get();
currentUnitOfWork = null;
}
return f;
}
public synchronized void abort() {
if (currentUnitOfWork != null) {
currentUnitOfWork.abort();
currentUnitOfWork = null;
}
public synchronized UnitOfWork begin(UnitOfWork parent) {
UnitOfWork child = new UnitOfWork(this, parent);
parent.addNestedUnitOfWork(child);
return child.begin();
}
public <E> SelectOperation<E> select(Class<E> entityClass) {

View file

@ -0,0 +1,30 @@
package net.helenus.core;
import java.util.Objects;
import java.util.*;
public class PostCommitFunction<T, R> implements java.util.function.Function<T, R> {
private final UnitOfWork uow;
private final List<CommitThunk> postCommit;
PostCommitFunction(UnitOfWork uow, List<CommitThunk> postCommit) {
this.uow = uow;
this.postCommit = postCommit;
}
public void andThen(CommitThunk after) {
Objects.requireNonNull(after);
if (postCommit == null) {
after.apply();
} else {
postCommit.add(after);
}
}
@Override
public R apply(T t) {
return null;
}
}

View file

@ -3,33 +3,24 @@ package net.helenus.core;
import com.diffplug.common.base.Errors;
import com.google.common.collect.TreeTraverser;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.function.Function;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public class UnitOfWork {
static private final Map<HelenusSession, UnitOfWork> all = new HashMap<HelenusSession, UnitOfWork>();
static private final List<UnitOfWork> nested = new ArrayList<>();
public final class UnitOfWork implements Closeable {
private final List<UnitOfWork> nested = new ArrayList<>();
private final HelenusSession session;
private ArrayList<Function> postCommit = new ArrayList<Function>();
private final UnitOfWork parent;
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
private boolean aborted = false;
private boolean committed = false;
/**
* Marks the beginning of a transactional section of work. Will write a record to the shared
* write-ahead log.
*
* @return the handle used to commit or abort the work.
*/
static UnitOfWork begin(HelenusSession session) {
protected UnitOfWork(HelenusSession session, UnitOfWork parent) {
Objects.requireNonNull(session, "containing session cannot be null");
UnitOfWork uow = new UnitOfWork(session);
synchronized (all) {
all.put(session, uow);
}
return uow;
this.session = session;
this.parent = parent;
}
/**
@ -38,32 +29,23 @@ public class UnitOfWork {
*
* @return the handle used to commit or abort the work.
*/
static UnitOfWork begin(UnitOfWork parent) {
Objects.requireNonNull(parent, "parent unit of work cannot be null");
Objects.requireNonNull(all.get(parent), "parent unit of work is not currently active");
UnitOfWork uow = new UnitOfWork(parent.session);
synchronized (all) {
all.put(parent.session, uow);
parent.addNestedUnitOfWork(uow);
}
return uow;
}
private UnitOfWork(HelenusSession session) {
this.session = session;
protected UnitOfWork begin() {
// log.record(txn::start)
return this;
}
private void addNestedUnitOfWork(UnitOfWork uow) {
protected UnitOfWork addNestedUnitOfWork(UnitOfWork uow) {
synchronized (nested) {
nested.add(uow);
}
return this;
}
private void applyPostCommitFunctions() {
for(Function f : postCommit) {
f.apply(null);
if (!postCommit.isEmpty()) {
for (CommitThunk f : postCommit) {
f.apply();
}
}
}
@ -81,19 +63,33 @@ public class UnitOfWork {
// All nested UnitOfWork should be committed (not aborted) before calls to commit, check.
boolean canCommit = true;
TreeTraverser<UnitOfWork> traverser = TreeTraverser.using(node -> node::getChildNodes);
for (UnitOfWork uow : traverser.postOrderTraversal(this)) { canCommit &= (!uow.aborted && uow.committed); }
for (UnitOfWork uow : traverser.postOrderTraversal(this)) {
if (this != uow) {
canCommit &= (!uow.aborted && uow.committed);
}
}
traverser.postOrderTraversal(this).forEach(uow -> { uow.applyPostCommitFunctions(); });
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
// log.record(txn::provisionalCommit)
// examine log for conflicts in read-set and write-set between begin and provisional commit
// if (conflict) { throw new ConflictingUnitOfWorkException(this) }
// else return function so as to enable commit.andThen(() -> { do something iff commit was successful; })
return new PostCommitFunction<Void, Void>(this);
}
if (canCommit) {
committed = true;
aborted = false;
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
// Apply all post-commit functions for
if (parent == null) {
traverser.postOrderTraversal(this).forEach(uow -> {
uow.applyPostCommitFunctions();
});
return new PostCommitFunction(this, null);
}
}
return new PostCommitFunction(this, postCommit);
}
public void rollback() {
abort();
@ -101,34 +97,33 @@ public class UnitOfWork {
/** Explicitly discard the work and mark it as as such in the log. */
public void abort() {
TreeTraverser<UnitOfWork> traverser = TreeTraverser.using(node -> node::getChildNodes);
traverser.postOrderTraversal(this).forEach(uow -> {
uow.committed = false;
uow.aborted = true;
});
// log.record(txn::abort)
// cache.invalidateSince(txn::start time)
TreeTraverser<UnitOfWork> traverser = TreeTraverser.using(node -> node::getChildNodes);
traverser.postOrderTraversal(this).forEach(uow -> { uow.aborted = true; });
}
public String describeConflicts() {
return "it's complex...";
}
private class PostCommitFunction<T, R> implements java.util.function.Function<T, R> {
private final UnitOfWork uow;
PostCommitFunction(UnitOfWork uow) {
this.uow = uow;
}
@Override
public <V> PostCommitFunction<T, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
postCommit.add(after);
return null;
}
@Override
public R apply(T t) {
return null;
@Override
public void close() throws IOException {
// Closing a UnitOfWork will abort iff we've not already aborted or committed this unit of work.
if (aborted == false && committed == false) {
abort();
}
}
public boolean hasAborted() {
return aborted;
}
public boolean hasCommitted() {
return committed;
}
}

View file

@ -0,0 +1,141 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.test.integration.core.unitofwork;
import com.datastax.driver.core.utils.UUIDs;
import net.bytebuddy.utility.RandomString;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.UnitOfWork;
import net.helenus.mapping.annotation.Column;
import net.helenus.mapping.annotation.PartitionKey;
import net.helenus.mapping.annotation.Table;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
@Table
interface Widget {
@PartitionKey UUID id();
@Column String name();
}
public class AndThenOrderTest extends AbstractEmbeddedCassandraTest {
static Widget widgets;
static HelenusSession session;
@BeforeClass
public static void beforeTest() {
session = Helenus.init(getSession())
.showCql()
.add(Widget.class)
.autoCreateDrop()
.get();
widgets = session.dsl(Widget.class);
}
@Test
public void testAndThenOrdering() throws Exception {
List<String> q = new ArrayList<String>(5);
UnitOfWork uow1, uow2, uow3, uow4, uow5;
uow5 = session.begin();
uow3 = session.begin(uow5);
uow1 = session.begin(uow3);
uow1.commit().andThen(() -> { q.add("1"); });
uow2 = session.begin(uow3);
uow2.commit().andThen(() -> { q.add("2"); });
uow3.commit().andThen(() -> { q.add("3"); });
uow4 = session.begin(uow5);
uow4.commit().andThen(() -> { q.add("4"); });
uow5.commit().andThen(() -> { q.add("5"); });
System.out.println(q);
Assert.assertTrue(Arrays.equals(q.toArray(new String[5]), new String[] {"1", "2", "3", "4", "5"}));
}
@Test
public void testExceptionWithinAndThen() throws Exception {
List<String> q = new ArrayList<String>(5);
UnitOfWork uow1, uow2, uow3, uow4, uow5;
uow5 = session.begin();
uow4 = session.begin(uow5);
try {
uow3 = session.begin(uow4);
uow1 = session.begin(uow3);
uow1.commit().andThen(() -> {
q.add("1");
});
uow2 = session.begin(uow3);
uow2.commit().andThen(() -> {
q.add("2");
});
uow3.commit().andThen(() -> {
q.add("3");
});
uow4.commit().andThen(() -> {
q.add("4");
});
throw new Exception();
} catch(Exception e) {
uow4.abort();
}
uow5.commit().andThen(() -> { q.add("5"); });
System.out.println(q);
Assert.assertTrue(q.isEmpty() == true);
}
@Test
public void testClosableWillAbortWhenNotCommitted() throws Exception {
UnitOfWork unitOfWork;
try(UnitOfWork uow = session.begin()) {
unitOfWork = uow;
Assert.assertFalse(uow.hasAborted());
}
Assert.assertTrue(unitOfWork.hasAborted());
}
@Test
public void testClosable() throws Exception {
UnitOfWork unitOfWork;
try(UnitOfWork uow = session.begin()) {
unitOfWork = uow;
Assert.assertFalse(uow.hasAborted());
uow.commit().andThen(() -> {
Assert.assertFalse(uow.hasAborted());
Assert.assertTrue(uow.hasCommitted());
});
}
Assert.assertFalse(unitOfWork.hasAborted());
Assert.assertTrue(unitOfWork.hasCommitted());
}
}

View file

@ -0,0 +1,30 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.test.integration.core.unitofwork;
import net.helenus.mapping.annotation.*;
import com.datastax.driver.core.DataType.Name;
import java.util.Set;
@UDT
public interface Directory extends FilesystemNode {
@Types.Set(Name.TIMEUUID)
Set<FilesystemNode> inodes();
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.test.integration.core.unitofwork;
import net.helenus.mapping.annotation.*;
@UDT
public interface File extends FilesystemNode {
@Column
byte[] data();
}

View file

@ -0,0 +1,25 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.test.integration.core.unitofwork;
import net.helenus.mapping.annotation.UDT;
@UDT
public interface FileAttributes {
String owner();
}

View file

@ -0,0 +1,34 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.test.integration.core.unitofwork;
import net.helenus.mapping.annotation.*;
import java.util.UUID;
@Table("fs")
public interface FilesystemNode {
@PartitionKey
UUID inode();
@ClusteringColumn
String name();
@Column
FileAttributes attr();
}

View file

@ -0,0 +1,52 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.test.integration.core.unitofwork;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
import net.helenus.test.integration.core.unitofwork.FilesystemNode;
import org.junit.BeforeClass;
import org.junit.Test;
public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
static FilesystemNode node;
static HelenusSession session;
@BeforeClass
public static void beforeTest() {
session = Helenus.init(getSession())
.showCql()
.add(FilesystemNode.class)
.autoCreateDrop()
.get();
node = session.dsl(FilesystemNode.class);
}
/*
@Test
public void testCruid() throws Exception {
session.insert()
.value(widgets::id, UUIDs.timeBased())
.value(widgets::name, RandomString.make(20))
.sync(uow5);
}
*/
}