diff --git a/pom.xml b/pom.xml
index a5fe826..dac750d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
net.helenus
helenus-core
- 2.0.39-SNAPSHOT
+ 2.0.40-SNAPSHOT
jar
helenus
diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java
new file mode 100644
index 0000000..dfb1ff7
--- /dev/null
+++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright (C) 2015 The Helenus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.helenus.core;
+
+import com.diffplug.common.base.Errors;
+import com.google.common.collect.TreeTraverser;
+
+import java.util.*;
+
+
+/** Encapsulates the concept of a "transaction" as a unit-of-work. */
+public abstract class AbstractUnitOfWork implements UnitOfWork, AutoCloseable {
+ private final List> nested = new ArrayList<>();
+ private final HelenusSession session;
+ private final AbstractUnitOfWork parent;
+ private List postCommit = new ArrayList();
+ private final Map> cache = new HashMap>();
+ private boolean aborted = false;
+ private boolean committed = false;
+
+ protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork parent) {
+ Objects.requireNonNull(session, "containing session cannot be null");
+
+ this.session = session;
+ this.parent = parent;
+ }
+
+ public UnitOfWork addNestedUnitOfWork(UnitOfWork uow) {
+ synchronized (nested) {
+ nested.add((AbstractUnitOfWork) uow);
+ }
+ return this;
+ }
+
+ public UnitOfWork begin() {
+ // log.record(txn::start)
+ return this;
+ }
+
+ private void applyPostCommitFunctions() {
+ if (!postCommit.isEmpty()) {
+ for (CommitThunk f : postCommit) {
+ f.apply();
+ }
+ }
+ }
+
+ public Set cacheLookup(String key) {
+ Set r = getCache().get(key);
+ if (r != null) {
+ return r;
+ } else {
+ if (parent != null) {
+ return parent.cacheLookup(key);
+ }
+ }
+ return null;
+ }
+
+ public Map> getCache() { return cache; }
+
+ private Iterator> getChildNodes() {
+ return nested.iterator();
+ }
+
+ /**
+ * Checks to see if the work performed between calling begin and now can be committed or not.
+ *
+ * @return a function from which to chain work that only happens when commit is successful
+ * @throws E when the work overlaps with other concurrent writers.
+ */
+ public PostCommitFunction commit() throws E {
+ // All nested UnitOfWork should be committed (not aborted) before calls to commit, check.
+ boolean canCommit = true;
+ TreeTraverser> traverser = TreeTraverser.using(node -> node::getChildNodes);
+ for (AbstractUnitOfWork uow : traverser.postOrderTraversal(this)) {
+ if (this != uow) {
+ canCommit &= (!uow.aborted && uow.committed);
+ }
+ }
+
+ // log.record(txn::provisionalCommit)
+ // examine log for conflicts in read-set and write-set between begin and provisional commit
+ // if (conflict) { throw new ConflictingUnitOfWorkException(this) }
+ // else return function so as to enable commit.andThen(() -> { do something iff commit was successful; })
+
+ if (canCommit) {
+ committed = true;
+ aborted = false;
+
+ // TODO(gburd): union this cache with parent's (if there is a parent) or with the session cache for all cacheable entities we currently hold
+
+ nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
+
+ // Merge UOW cache into parent's cache.
+ if (parent != null) {
+ Map> parentCache = parent.getCache();
+ for (String key : cache.keySet()) {
+ if (parentCache.containsKey(key)) {
+ // merge the sets
+ Set ps = parentCache.get(key);
+ ps.addAll(cache.get(key)); //TODO(gburd): review this, likely not correct in all cases as-is.
+ } else {
+ // add the missing set
+ parentCache.put(key, cache.get(key));
+ }
+ }
+ }
+
+ // Apply all post-commit functions for
+ if (parent == null) {
+ traverser.postOrderTraversal(this).forEach(uow -> {
+ uow.applyPostCommitFunctions();
+ });
+ return new PostCommitFunction(this, null);
+ }
+ }
+ // else {
+ // Constructor ctor = clazz.getConstructor(conflictExceptionClass);
+ // T object = ctor.newInstance(new Object[] { String message });
+ // }
+ return new PostCommitFunction(this, postCommit);
+ }
+
+ /* Explicitly discard the work and mark it as as such in the log. */
+ public void abort() {
+ TreeTraverser> traverser = TreeTraverser.using(node -> node::getChildNodes);
+ traverser.postOrderTraversal(this).forEach(uow -> {
+ uow.committed = false;
+ uow.aborted = true;
+ });
+ // log.record(txn::abort)
+ // cache.invalidateSince(txn::start time)
+ }
+
+ public String describeConflicts() {
+ return "it's complex...";
+ }
+
+ @Override
+ public void close() throws E {
+ // Closing a AbstractUnitOfWork will abort iff we've not already aborted or committed this unit of work.
+ if (aborted == false && committed == false) {
+ abort();
+ }
+ }
+
+ public boolean hasAborted() {
+ return aborted;
+ }
+
+ public boolean hasCommitted() {
+ return committed;
+ }
+
+}
diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java
index ff63e98..e2a2104 100644
--- a/src/main/java/net/helenus/core/HelenusSession.java
+++ b/src/main/java/net/helenus/core/HelenusSession.java
@@ -30,10 +30,13 @@ import net.helenus.support.Fun;
import net.helenus.support.Fun.Tuple1;
import net.helenus.support.Fun.Tuple2;
import net.helenus.support.Fun.Tuple6;
+import net.helenus.support.HelenusException;
import net.helenus.support.HelenusMappingException;
import java.io.Closeable;
import java.io.PrintStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -56,6 +59,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
private final MetricRegistry metricRegistry;
private final Tracer zipkinTracer;
private final PrintStream printStream;
+ private final Class extends UnitOfWork> unitOfWorkClass;
private final SessionRepository sessionRepository;
private final Executor executor;
private final boolean dropSchemaOnClose;
@@ -75,7 +79,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
Executor executor,
boolean dropSchemaOnClose,
ConsistencyLevel consistencyLevel,
- Class extends Exception> conflictExceptionClass,
+ Class extends UnitOfWork> unitOfWorkClass,
MetricRegistry metricRegistry,
Tracer tracer) {
this.session = session;
@@ -89,7 +93,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
this.executor = executor;
this.dropSchemaOnClose = dropSchemaOnClose;
this.defaultConsistencyLevel = consistencyLevel;
- UnitOfWork.conflictExceptionClass = conflictExceptionClass;
+ this.unitOfWorkClass = unitOfWorkClass;
this.metricRegistry = metricRegistry;
this.zipkinTracer = tracer;
@@ -172,12 +176,23 @@ public final class HelenusSession extends AbstractSessionOperations implements C
return metadata;
}
- public synchronized UnitOfWork begin() { return new UnitOfWork(this, null).begin(); }
+ public synchronized UnitOfWork begin() {
+ return begin(null);
+ }
- public synchronized UnitOfWork begin(UnitOfWork parent) {
- UnitOfWork child = new UnitOfWork(this, parent);
- parent.addNestedUnitOfWork(child);
- return child.begin();
+ public synchronized UnitOfWork begin(UnitOfWork parent) {
+ try {
+ Class extends UnitOfWork> clazz = unitOfWorkClass;
+ Constructor extends UnitOfWork> ctor = clazz.getConstructor(HelenusSession.class, UnitOfWork.class);
+ UnitOfWork uow = ctor.newInstance(this, parent);
+ if (parent != null) {
+ parent.addNestedUnitOfWork(uow);
+ }
+ return uow.begin();
+ }
+ catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
+ throw new HelenusException(String.format("Unable to instantiate {} as a UnitOfWork.", unitOfWorkClass.getSimpleName()), e);
+ }
}
public SelectOperation select(E pojo) {
diff --git a/src/main/java/net/helenus/core/SessionInitializer.java b/src/main/java/net/helenus/core/SessionInitializer.java
index 0a58e7a..677eb1b 100644
--- a/src/main/java/net/helenus/core/SessionInitializer.java
+++ b/src/main/java/net/helenus/core/SessionInitializer.java
@@ -43,7 +43,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
private Tracer zipkinTracer;
private PrintStream printStream = System.out;
private Executor executor = MoreExecutors.directExecutor();
- private Class extends Exception> conflictingUnitOfWorkClass = ConflictingUnitOfWorkException.class;
+ private Class extends UnitOfWork> unitOfWorkClass = UnitOfWorkImpl.class;
private SessionRepositoryBuilder sessionRepository;
@@ -111,8 +111,8 @@ public final class SessionInitializer extends AbstractSessionOperations {
return this;
}
- public SessionInitializer setConflictingUnitOfWorkException(Class extends Exception> e) {
- this.conflictingUnitOfWorkClass = e;
+ public SessionInitializer setUnitOfWorkClass(Class extends UnitOfWork> e) {
+ this.unitOfWorkClass = e;
return this;
}
@@ -241,7 +241,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
executor,
autoDdl == AutoDdl.CREATE_DROP,
consistencyLevel,
- conflictingUnitOfWorkClass,
+ unitOfWorkClass,
metricRegistry,
zipkinTracer);
}
diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java
index c947c12..0cfa536 100644
--- a/src/main/java/net/helenus/core/UnitOfWork.java
+++ b/src/main/java/net/helenus/core/UnitOfWork.java
@@ -1,166 +1,58 @@
+/*
+ * Copyright (C) 2015 The Helenus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package net.helenus.core;
-import com.diffplug.common.base.Errors;
-import com.google.common.collect.TreeTraverser;
-import net.helenus.support.HelenusException;
+import net.helenus.support.Either;
-import java.util.*;
+import java.util.Map;
+import java.util.Set;
+
+public interface UnitOfWork extends AutoCloseable {
+
+ /**
+ * Marks the beginning of a transactional section of work. Will write a record to the shared
+ * write-ahead log.
+ *
+ * @return the handle used to commit or abort the work.
+ */
+ UnitOfWork begin();
+
+ UnitOfWork addNestedUnitOfWork(UnitOfWork uow);
+
+ /**
+ * Checks to see if the work performed between calling begin and now can be committed or not.
+ *
+ * @return a function from which to chain work that only happens when commit is successful
+ * @throws E when the work overlaps with other concurrent writers.
+ */
+ PostCommitFunction commit() throws E;
+
+ /**
+ * Explicitly abort the work within this unit of work. Any nested aborted unit of work
+ * will trigger the entire unit of work to commit.
+ */
+ void abort();
-/** Encapsulates the concept of a "transaction" as a unit-of-work. */
-public class UnitOfWork implements AutoCloseable {
- protected static Class extends Exception>conflictExceptionClass = ConflictingUnitOfWorkException.class;
- private final List nested = new ArrayList<>();
- private final HelenusSession session;
- private final UnitOfWork parent;
- private List postCommit = new ArrayList();
- private final Map> cache = new HashMap>();
- private boolean aborted = false;
- private boolean committed = false;
+ boolean hasAborted();
- protected UnitOfWork(HelenusSession session, UnitOfWork parent) {
- Objects.requireNonNull(session, "containing session cannot be null");
+ boolean hasCommitted();
- this.session = session;
- this.parent = parent;
- }
-
- /**
- * Marks the beginning of a transactional section of work. Will write a record to the shared
- * write-ahead log.
- *
- * @return the handle used to commit or abort the work.
- */
- protected UnitOfWork begin() {
- // log.record(txn::start)
- return this;
- }
-
- protected UnitOfWork addNestedUnitOfWork(UnitOfWork uow) {
- synchronized (nested) {
- nested.add(uow);
- }
- return this;
- }
-
- private void applyPostCommitFunctions() {
- if (!postCommit.isEmpty()) {
- for (CommitThunk f : postCommit) {
- f.apply();
- }
- }
- }
-
- public Set cacheLookup(String key) {
- Set r = getCache().get(key);
- if (r != null) {
- return r;
- } else {
- if (parent != null) {
- return parent.cacheLookup(key);
- }
- }
- return null;
- }
-
- public Map> getCache() { return cache; }
-
- private Iterator getChildNodes() {
- return nested.iterator();
- }
-
- /**
- * Checks to see if the work performed between calling begin and now can be committed or not.
- *
- * @return a function from which to chain work that only happens when commit is successful
- * @throws T when the work overlaps with other concurrent writers.
- */
- public PostCommitFunction commit() throws T {
- // All nested UnitOfWork should be committed (not aborted) before calls to commit, check.
- boolean canCommit = true;
- TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes);
- for (UnitOfWork uow : traverser.postOrderTraversal(this)) {
- if (this != uow) {
- canCommit &= (!uow.aborted && uow.committed);
- }
- }
-
- // log.record(txn::provisionalCommit)
- // examine log for conflicts in read-set and write-set between begin and provisional commit
- // if (conflict) { throw new ConflictingUnitOfWorkException(this) }
- // else return function so as to enable commit.andThen(() -> { do something iff commit was successful; })
-
- if (canCommit) {
- committed = true;
- aborted = false;
-
- // TODO(gburd): union this cache with parent's (if there is a parent) or with the session cache for all cacheable entities we currently hold
-
- nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
-
- // Merge UOW cache into parent's cache.
- if (parent != null) {
- Map> parentCache = parent.getCache();
- for (String key : cache.keySet()) {
- if (parentCache.containsKey(key)) {
- // merge the sets
- Set ps = parentCache.get(key);
- ps.addAll(cache.get(key)); //TODO(gburd): review this, likely not correct in all cases as-is.
- } else {
- // add the missing set
- parentCache.put(key, cache.get(key));
- }
- }
- }
-
- // Apply all post-commit functions for
- if (parent == null) {
- traverser.postOrderTraversal(this).forEach(uow -> {
- uow.applyPostCommitFunctions();
- });
- return new PostCommitFunction(this, null);
- }
- }
- // else {
- // Constructor ctor = clazz.getConstructor(conflictExceptionClass);
- // T object = ctor.newInstance(new Object[] { String message });
- // }
- return new PostCommitFunction(this, postCommit);
- }
-
- public void rollback() {
- abort();
- }
-
- /* Explicitly discard the work and mark it as as such in the log. */
- public void abort() {
- TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes);
- traverser.postOrderTraversal(this).forEach(uow -> {
- uow.committed = false;
- uow.aborted = true;
- });
- // log.record(txn::abort)
- // cache.invalidateSince(txn::start time)
- }
-
- public String describeConflicts() {
- return "it's complex...";
- }
-
- @Override
- public void close() throws HelenusException {
- // Closing a UnitOfWork will abort iff we've not already aborted or committed this unit of work.
- if (aborted == false && committed == false) {
- abort();
- }
- }
-
- public boolean hasAborted() {
- return aborted;
- }
-
- public boolean hasCommitted() {
- return committed;
- }
+ //Either> cacheLookup(String key);
+ Set cacheLookup(String key);
+ Map> getCache();
}
diff --git a/src/main/java/net/helenus/core/UnitOfWorkImpl.java b/src/main/java/net/helenus/core/UnitOfWorkImpl.java
new file mode 100644
index 0000000..b9aab3b
--- /dev/null
+++ b/src/main/java/net/helenus/core/UnitOfWorkImpl.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2015 The Helenus Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package net.helenus.core;
+
+import net.helenus.support.HelenusException;
+
+class UnitOfWorkImpl extends AbstractUnitOfWork {
+
+ @SuppressWarnings("unchecked")
+ public UnitOfWorkImpl(HelenusSession session, UnitOfWork parent) {
+ super(session, (AbstractUnitOfWork) parent);
+ }
+
+}
diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java
index 81a9588..05403d1 100644
--- a/src/main/java/net/helenus/core/operation/SelectOperation.java
+++ b/src/main/java/net/helenus/core/operation/SelectOperation.java
@@ -49,11 +49,12 @@ public final class SelectOperation extends AbstractFilterStreamOperation() {
+ new Function() {
@Override
public E apply(Row source) {
diff --git a/src/main/java/net/helenus/mapping/value/UDTColumnValueProvider.java b/src/main/java/net/helenus/mapping/value/UDTColumnValueProvider.java
index 4fbffbf..5b617b0 100644
--- a/src/main/java/net/helenus/mapping/value/UDTColumnValueProvider.java
+++ b/src/main/java/net/helenus/mapping/value/UDTColumnValueProvider.java
@@ -34,6 +34,7 @@ public final class UDTColumnValueProvider implements ColumnValueProvider {
}
@Override
+ @SuppressWarnings("unchecked")
public V getColumnValue(Object sourceObj, int columnIndexUnused, HelenusProperty property) {
UDTValue source = (UDTValue) sourceObj;