From a7094abdfa0715bc998fb093b4a9922a60a160c0 Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Wed, 20 Sep 2017 14:52:08 -0400 Subject: [PATCH] Re-work API for UnitOfWork so as to be able to instantiate custom sub-classes if need be. --- pom.xml | 2 +- .../net/helenus/core/AbstractUnitOfWork.java | 169 ++++++++++++++ .../java/net/helenus/core/HelenusSession.java | 29 ++- .../net/helenus/core/SessionInitializer.java | 8 +- .../java/net/helenus/core/UnitOfWork.java | 206 +++++------------- .../java/net/helenus/core/UnitOfWorkImpl.java | 27 +++ .../core/operation/InsertOperation.java | 1 + .../core/operation/SelectOperation.java | 3 +- .../core/operation/UpdateOperation.java | 1 + .../mapping/value/UDTColumnValueProvider.java | 1 + 10 files changed, 277 insertions(+), 170 deletions(-) create mode 100644 src/main/java/net/helenus/core/AbstractUnitOfWork.java create mode 100644 src/main/java/net/helenus/core/UnitOfWorkImpl.java diff --git a/pom.xml b/pom.xml index a5fe826..566445e 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 net.helenus helenus-core - 2.0.39-SNAPSHOT + 2.0.41-SNAPSHOT jar helenus diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java new file mode 100644 index 0000000..dfb1ff7 --- /dev/null +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -0,0 +1,169 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.core; + +import com.diffplug.common.base.Errors; +import com.google.common.collect.TreeTraverser; + +import java.util.*; + + +/** Encapsulates the concept of a "transaction" as a unit-of-work. */ +public abstract class AbstractUnitOfWork implements UnitOfWork, AutoCloseable { + private final List> nested = new ArrayList<>(); + private final HelenusSession session; + private final AbstractUnitOfWork parent; + private List postCommit = new ArrayList(); + private final Map> cache = new HashMap>(); + private boolean aborted = false; + private boolean committed = false; + + protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork parent) { + Objects.requireNonNull(session, "containing session cannot be null"); + + this.session = session; + this.parent = parent; + } + + public UnitOfWork addNestedUnitOfWork(UnitOfWork uow) { + synchronized (nested) { + nested.add((AbstractUnitOfWork) uow); + } + return this; + } + + public UnitOfWork begin() { + // log.record(txn::start) + return this; + } + + private void applyPostCommitFunctions() { + if (!postCommit.isEmpty()) { + for (CommitThunk f : postCommit) { + f.apply(); + } + } + } + + public Set cacheLookup(String key) { + Set r = getCache().get(key); + if (r != null) { + return r; + } else { + if (parent != null) { + return parent.cacheLookup(key); + } + } + return null; + } + + public Map> getCache() { return cache; } + + private Iterator> getChildNodes() { + return nested.iterator(); + } + + /** + * Checks to see if the work performed between calling begin and now can be committed or not. + * + * @return a function from which to chain work that only happens when commit is successful + * @throws E when the work overlaps with other concurrent writers. + */ + public PostCommitFunction commit() throws E { + // All nested UnitOfWork should be committed (not aborted) before calls to commit, check. + boolean canCommit = true; + TreeTraverser> traverser = TreeTraverser.using(node -> node::getChildNodes); + for (AbstractUnitOfWork uow : traverser.postOrderTraversal(this)) { + if (this != uow) { + canCommit &= (!uow.aborted && uow.committed); + } + } + + // log.record(txn::provisionalCommit) + // examine log for conflicts in read-set and write-set between begin and provisional commit + // if (conflict) { throw new ConflictingUnitOfWorkException(this) } + // else return function so as to enable commit.andThen(() -> { do something iff commit was successful; }) + + if (canCommit) { + committed = true; + aborted = false; + + // TODO(gburd): union this cache with parent's (if there is a parent) or with the session cache for all cacheable entities we currently hold + + nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit)); + + // Merge UOW cache into parent's cache. + if (parent != null) { + Map> parentCache = parent.getCache(); + for (String key : cache.keySet()) { + if (parentCache.containsKey(key)) { + // merge the sets + Set ps = parentCache.get(key); + ps.addAll(cache.get(key)); //TODO(gburd): review this, likely not correct in all cases as-is. + } else { + // add the missing set + parentCache.put(key, cache.get(key)); + } + } + } + + // Apply all post-commit functions for + if (parent == null) { + traverser.postOrderTraversal(this).forEach(uow -> { + uow.applyPostCommitFunctions(); + }); + return new PostCommitFunction(this, null); + } + } + // else { + // Constructor ctor = clazz.getConstructor(conflictExceptionClass); + // T object = ctor.newInstance(new Object[] { String message }); + // } + return new PostCommitFunction(this, postCommit); + } + + /* Explicitly discard the work and mark it as as such in the log. */ + public void abort() { + TreeTraverser> traverser = TreeTraverser.using(node -> node::getChildNodes); + traverser.postOrderTraversal(this).forEach(uow -> { + uow.committed = false; + uow.aborted = true; + }); + // log.record(txn::abort) + // cache.invalidateSince(txn::start time) + } + + public String describeConflicts() { + return "it's complex..."; + } + + @Override + public void close() throws E { + // Closing a AbstractUnitOfWork will abort iff we've not already aborted or committed this unit of work. + if (aborted == false && committed == false) { + abort(); + } + } + + public boolean hasAborted() { + return aborted; + } + + public boolean hasCommitted() { + return committed; + } + +} diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index ff63e98..e2a2104 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -30,10 +30,13 @@ import net.helenus.support.Fun; import net.helenus.support.Fun.Tuple1; import net.helenus.support.Fun.Tuple2; import net.helenus.support.Fun.Tuple6; +import net.helenus.support.HelenusException; import net.helenus.support.HelenusMappingException; import java.io.Closeable; import java.io.PrintStream; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -56,6 +59,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C private final MetricRegistry metricRegistry; private final Tracer zipkinTracer; private final PrintStream printStream; + private final Class unitOfWorkClass; private final SessionRepository sessionRepository; private final Executor executor; private final boolean dropSchemaOnClose; @@ -75,7 +79,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C Executor executor, boolean dropSchemaOnClose, ConsistencyLevel consistencyLevel, - Class conflictExceptionClass, + Class unitOfWorkClass, MetricRegistry metricRegistry, Tracer tracer) { this.session = session; @@ -89,7 +93,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C this.executor = executor; this.dropSchemaOnClose = dropSchemaOnClose; this.defaultConsistencyLevel = consistencyLevel; - UnitOfWork.conflictExceptionClass = conflictExceptionClass; + this.unitOfWorkClass = unitOfWorkClass; this.metricRegistry = metricRegistry; this.zipkinTracer = tracer; @@ -172,12 +176,23 @@ public final class HelenusSession extends AbstractSessionOperations implements C return metadata; } - public synchronized UnitOfWork begin() { return new UnitOfWork(this, null).begin(); } + public synchronized UnitOfWork begin() { + return begin(null); + } - public synchronized UnitOfWork begin(UnitOfWork parent) { - UnitOfWork child = new UnitOfWork(this, parent); - parent.addNestedUnitOfWork(child); - return child.begin(); + public synchronized UnitOfWork begin(UnitOfWork parent) { + try { + Class clazz = unitOfWorkClass; + Constructor ctor = clazz.getConstructor(HelenusSession.class, UnitOfWork.class); + UnitOfWork uow = ctor.newInstance(this, parent); + if (parent != null) { + parent.addNestedUnitOfWork(uow); + } + return uow.begin(); + } + catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { + throw new HelenusException(String.format("Unable to instantiate {} as a UnitOfWork.", unitOfWorkClass.getSimpleName()), e); + } } public SelectOperation select(E pojo) { diff --git a/src/main/java/net/helenus/core/SessionInitializer.java b/src/main/java/net/helenus/core/SessionInitializer.java index 0a58e7a..677eb1b 100644 --- a/src/main/java/net/helenus/core/SessionInitializer.java +++ b/src/main/java/net/helenus/core/SessionInitializer.java @@ -43,7 +43,7 @@ public final class SessionInitializer extends AbstractSessionOperations { private Tracer zipkinTracer; private PrintStream printStream = System.out; private Executor executor = MoreExecutors.directExecutor(); - private Class conflictingUnitOfWorkClass = ConflictingUnitOfWorkException.class; + private Class unitOfWorkClass = UnitOfWorkImpl.class; private SessionRepositoryBuilder sessionRepository; @@ -111,8 +111,8 @@ public final class SessionInitializer extends AbstractSessionOperations { return this; } - public SessionInitializer setConflictingUnitOfWorkException(Class e) { - this.conflictingUnitOfWorkClass = e; + public SessionInitializer setUnitOfWorkClass(Class e) { + this.unitOfWorkClass = e; return this; } @@ -241,7 +241,7 @@ public final class SessionInitializer extends AbstractSessionOperations { executor, autoDdl == AutoDdl.CREATE_DROP, consistencyLevel, - conflictingUnitOfWorkClass, + unitOfWorkClass, metricRegistry, zipkinTracer); } diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index c947c12..0cfa536 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -1,166 +1,58 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package net.helenus.core; -import com.diffplug.common.base.Errors; -import com.google.common.collect.TreeTraverser; -import net.helenus.support.HelenusException; +import net.helenus.support.Either; -import java.util.*; +import java.util.Map; +import java.util.Set; + +public interface UnitOfWork extends AutoCloseable { + + /** + * Marks the beginning of a transactional section of work. Will write a record to the shared + * write-ahead log. + * + * @return the handle used to commit or abort the work. + */ + UnitOfWork begin(); + + UnitOfWork addNestedUnitOfWork(UnitOfWork uow); + + /** + * Checks to see if the work performed between calling begin and now can be committed or not. + * + * @return a function from which to chain work that only happens when commit is successful + * @throws E when the work overlaps with other concurrent writers. + */ + PostCommitFunction commit() throws E; + + /** + * Explicitly abort the work within this unit of work. Any nested aborted unit of work + * will trigger the entire unit of work to commit. + */ + void abort(); -/** Encapsulates the concept of a "transaction" as a unit-of-work. */ -public class UnitOfWork implements AutoCloseable { - protected static ClassconflictExceptionClass = ConflictingUnitOfWorkException.class; - private final List nested = new ArrayList<>(); - private final HelenusSession session; - private final UnitOfWork parent; - private List postCommit = new ArrayList(); - private final Map> cache = new HashMap>(); - private boolean aborted = false; - private boolean committed = false; + boolean hasAborted(); - protected UnitOfWork(HelenusSession session, UnitOfWork parent) { - Objects.requireNonNull(session, "containing session cannot be null"); + boolean hasCommitted(); - this.session = session; - this.parent = parent; - } - - /** - * Marks the beginning of a transactional section of work. Will write a record to the shared - * write-ahead log. - * - * @return the handle used to commit or abort the work. - */ - protected UnitOfWork begin() { - // log.record(txn::start) - return this; - } - - protected UnitOfWork addNestedUnitOfWork(UnitOfWork uow) { - synchronized (nested) { - nested.add(uow); - } - return this; - } - - private void applyPostCommitFunctions() { - if (!postCommit.isEmpty()) { - for (CommitThunk f : postCommit) { - f.apply(); - } - } - } - - public Set cacheLookup(String key) { - Set r = getCache().get(key); - if (r != null) { - return r; - } else { - if (parent != null) { - return parent.cacheLookup(key); - } - } - return null; - } - - public Map> getCache() { return cache; } - - private Iterator getChildNodes() { - return nested.iterator(); - } - - /** - * Checks to see if the work performed between calling begin and now can be committed or not. - * - * @return a function from which to chain work that only happens when commit is successful - * @throws T when the work overlaps with other concurrent writers. - */ - public PostCommitFunction commit() throws T { - // All nested UnitOfWork should be committed (not aborted) before calls to commit, check. - boolean canCommit = true; - TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes); - for (UnitOfWork uow : traverser.postOrderTraversal(this)) { - if (this != uow) { - canCommit &= (!uow.aborted && uow.committed); - } - } - - // log.record(txn::provisionalCommit) - // examine log for conflicts in read-set and write-set between begin and provisional commit - // if (conflict) { throw new ConflictingUnitOfWorkException(this) } - // else return function so as to enable commit.andThen(() -> { do something iff commit was successful; }) - - if (canCommit) { - committed = true; - aborted = false; - - // TODO(gburd): union this cache with parent's (if there is a parent) or with the session cache for all cacheable entities we currently hold - - nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit)); - - // Merge UOW cache into parent's cache. - if (parent != null) { - Map> parentCache = parent.getCache(); - for (String key : cache.keySet()) { - if (parentCache.containsKey(key)) { - // merge the sets - Set ps = parentCache.get(key); - ps.addAll(cache.get(key)); //TODO(gburd): review this, likely not correct in all cases as-is. - } else { - // add the missing set - parentCache.put(key, cache.get(key)); - } - } - } - - // Apply all post-commit functions for - if (parent == null) { - traverser.postOrderTraversal(this).forEach(uow -> { - uow.applyPostCommitFunctions(); - }); - return new PostCommitFunction(this, null); - } - } - // else { - // Constructor ctor = clazz.getConstructor(conflictExceptionClass); - // T object = ctor.newInstance(new Object[] { String message }); - // } - return new PostCommitFunction(this, postCommit); - } - - public void rollback() { - abort(); - } - - /* Explicitly discard the work and mark it as as such in the log. */ - public void abort() { - TreeTraverser traverser = TreeTraverser.using(node -> node::getChildNodes); - traverser.postOrderTraversal(this).forEach(uow -> { - uow.committed = false; - uow.aborted = true; - }); - // log.record(txn::abort) - // cache.invalidateSince(txn::start time) - } - - public String describeConflicts() { - return "it's complex..."; - } - - @Override - public void close() throws HelenusException { - // Closing a UnitOfWork will abort iff we've not already aborted or committed this unit of work. - if (aborted == false && committed == false) { - abort(); - } - } - - public boolean hasAborted() { - return aborted; - } - - public boolean hasCommitted() { - return committed; - } + //Either> cacheLookup(String key); + Set cacheLookup(String key); + Map> getCache(); } diff --git a/src/main/java/net/helenus/core/UnitOfWorkImpl.java b/src/main/java/net/helenus/core/UnitOfWorkImpl.java new file mode 100644 index 0000000..b9aab3b --- /dev/null +++ b/src/main/java/net/helenus/core/UnitOfWorkImpl.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.core; + +import net.helenus.support.HelenusException; + +class UnitOfWorkImpl extends AbstractUnitOfWork { + + @SuppressWarnings("unchecked") + public UnitOfWorkImpl(HelenusSession session, UnitOfWork parent) { + super(session, (AbstractUnitOfWork) parent); + } + +} diff --git a/src/main/java/net/helenus/core/operation/InsertOperation.java b/src/main/java/net/helenus/core/operation/InsertOperation.java index 52ad767..f05bf87 100644 --- a/src/main/java/net/helenus/core/operation/InsertOperation.java +++ b/src/main/java/net/helenus/core/operation/InsertOperation.java @@ -248,6 +248,7 @@ public final class InsertOperation extends AbstractOperation iface = entity.getMappingInterface(); if (resultType == iface) { diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index 81a9588..05403d1 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -49,11 +49,12 @@ public final class SelectOperation extends AbstractFilterStreamOperation() { + new Function() { @Override public E apply(Row source) { diff --git a/src/main/java/net/helenus/core/operation/UpdateOperation.java b/src/main/java/net/helenus/core/operation/UpdateOperation.java index f04c90e..ba358d6 100644 --- a/src/main/java/net/helenus/core/operation/UpdateOperation.java +++ b/src/main/java/net/helenus/core/operation/UpdateOperation.java @@ -579,6 +579,7 @@ public final class UpdateOperation extends AbstractFilterOperation V getColumnValue(Object sourceObj, int columnIndexUnused, HelenusProperty property) { UDTValue source = (UDTValue) sourceObj;