Re-work API for UnitOfWork so as to be able to instantiate custom sub-classes if need be.

This commit is contained in:
Greg Burd 2017-09-20 14:52:08 -04:00
parent ac7db5f243
commit 6e0877efb7
8 changed files with 275 additions and 170 deletions

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>net.helenus</groupId>
<artifactId>helenus-core</artifactId>
<version>2.0.39-SNAPSHOT</version>
<version>2.0.40-SNAPSHOT</version>
<packaging>jar</packaging>
<name>helenus</name>

View file

@ -0,0 +1,169 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core;
import com.diffplug.common.base.Errors;
import com.google.common.collect.TreeTraverser;
import java.util.*;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfWork, AutoCloseable {
private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>();
private final HelenusSession session;
private final AbstractUnitOfWork<E> parent;
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
private final Map<String, Set<Object>> cache = new HashMap<String, Set<Object>>();
private boolean aborted = false;
private boolean committed = false;
protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork<E> parent) {
Objects.requireNonNull(session, "containing session cannot be null");
this.session = session;
this.parent = parent;
}
public UnitOfWork addNestedUnitOfWork(UnitOfWork uow) {
synchronized (nested) {
nested.add((AbstractUnitOfWork<E>) uow);
}
return this;
}
public UnitOfWork begin() {
// log.record(txn::start)
return this;
}
private void applyPostCommitFunctions() {
if (!postCommit.isEmpty()) {
for (CommitThunk f : postCommit) {
f.apply();
}
}
}
public Set<Object> cacheLookup(String key) {
Set<Object> r = getCache().get(key);
if (r != null) {
return r;
} else {
if (parent != null) {
return parent.cacheLookup(key);
}
}
return null;
}
public Map<String, Set<Object>> getCache() { return cache; }
private Iterator<AbstractUnitOfWork<E>> getChildNodes() {
return nested.iterator();
}
/**
* Checks to see if the work performed between calling begin and now can be committed or not.
*
* @return a function from which to chain work that only happens when commit is successful
* @throws E when the work overlaps with other concurrent writers.
*/
public PostCommitFunction<Void, Void> commit() throws E {
// All nested UnitOfWork should be committed (not aborted) before calls to commit, check.
boolean canCommit = true;
TreeTraverser<AbstractUnitOfWork<E>> traverser = TreeTraverser.using(node -> node::getChildNodes);
for (AbstractUnitOfWork<E> uow : traverser.postOrderTraversal(this)) {
if (this != uow) {
canCommit &= (!uow.aborted && uow.committed);
}
}
// log.record(txn::provisionalCommit)
// examine log for conflicts in read-set and write-set between begin and provisional commit
// if (conflict) { throw new ConflictingUnitOfWorkException(this) }
// else return function so as to enable commit.andThen(() -> { do something iff commit was successful; })
if (canCommit) {
committed = true;
aborted = false;
// TODO(gburd): union this cache with parent's (if there is a parent) or with the session cache for all cacheable entities we currently hold
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
// Merge UOW cache into parent's cache.
if (parent != null) {
Map<String, Set<Object>> parentCache = parent.getCache();
for (String key : cache.keySet()) {
if (parentCache.containsKey(key)) {
// merge the sets
Set<Object> ps = parentCache.get(key);
ps.addAll(cache.get(key)); //TODO(gburd): review this, likely not correct in all cases as-is.
} else {
// add the missing set
parentCache.put(key, cache.get(key));
}
}
}
// Apply all post-commit functions for
if (parent == null) {
traverser.postOrderTraversal(this).forEach(uow -> {
uow.applyPostCommitFunctions();
});
return new PostCommitFunction(this, null);
}
}
// else {
// Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass);
// T object = ctor.newInstance(new Object[] { String message });
// }
return new PostCommitFunction(this, postCommit);
}
/* Explicitly discard the work and mark it as as such in the log. */
public void abort() {
TreeTraverser<AbstractUnitOfWork<E>> traverser = TreeTraverser.using(node -> node::getChildNodes);
traverser.postOrderTraversal(this).forEach(uow -> {
uow.committed = false;
uow.aborted = true;
});
// log.record(txn::abort)
// cache.invalidateSince(txn::start time)
}
public String describeConflicts() {
return "it's complex...";
}
@Override
public void close() throws E {
// Closing a AbstractUnitOfWork will abort iff we've not already aborted or committed this unit of work.
if (aborted == false && committed == false) {
abort();
}
}
public boolean hasAborted() {
return aborted;
}
public boolean hasCommitted() {
return committed;
}
}

View file

@ -30,10 +30,13 @@ import net.helenus.support.Fun;
import net.helenus.support.Fun.Tuple1;
import net.helenus.support.Fun.Tuple2;
import net.helenus.support.Fun.Tuple6;
import net.helenus.support.HelenusException;
import net.helenus.support.HelenusMappingException;
import java.io.Closeable;
import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -56,6 +59,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
private final MetricRegistry metricRegistry;
private final Tracer zipkinTracer;
private final PrintStream printStream;
private final Class<? extends UnitOfWork> unitOfWorkClass;
private final SessionRepository sessionRepository;
private final Executor executor;
private final boolean dropSchemaOnClose;
@ -75,7 +79,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
Executor executor,
boolean dropSchemaOnClose,
ConsistencyLevel consistencyLevel,
Class<? extends Exception> conflictExceptionClass,
Class<? extends UnitOfWork> unitOfWorkClass,
MetricRegistry metricRegistry,
Tracer tracer) {
this.session = session;
@ -89,7 +93,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
this.executor = executor;
this.dropSchemaOnClose = dropSchemaOnClose;
this.defaultConsistencyLevel = consistencyLevel;
UnitOfWork.conflictExceptionClass = conflictExceptionClass;
this.unitOfWorkClass = unitOfWorkClass;
this.metricRegistry = metricRegistry;
this.zipkinTracer = tracer;
@ -172,12 +176,23 @@ public final class HelenusSession extends AbstractSessionOperations implements C
return metadata;
}
public synchronized <T extends Exception> UnitOfWork<T> begin() { return new UnitOfWork<T>(this, null).begin(); }
public synchronized UnitOfWork begin() {
return begin(null);
}
public synchronized <T extends Exception> UnitOfWork<T> begin(UnitOfWork<T> parent) {
UnitOfWork child = new UnitOfWork(this, parent);
parent.addNestedUnitOfWork(child);
return child.begin();
public synchronized UnitOfWork begin(UnitOfWork parent) {
try {
Class<? extends UnitOfWork> clazz = unitOfWorkClass;
Constructor<? extends UnitOfWork> ctor = clazz.getConstructor(HelenusSession.class, UnitOfWork.class);
UnitOfWork uow = ctor.newInstance(this, parent);
if (parent != null) {
parent.addNestedUnitOfWork(uow);
}
return uow.begin();
}
catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
throw new HelenusException(String.format("Unable to instantiate {} as a UnitOfWork.", unitOfWorkClass.getSimpleName()), e);
}
}
public <E> SelectOperation<E> select(E pojo) {

View file

@ -43,7 +43,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
private Tracer zipkinTracer;
private PrintStream printStream = System.out;
private Executor executor = MoreExecutors.directExecutor();
private Class<? extends Exception> conflictingUnitOfWorkClass = ConflictingUnitOfWorkException.class;
private Class<? extends UnitOfWork> unitOfWorkClass = UnitOfWorkImpl.class;
private SessionRepositoryBuilder sessionRepository;
@ -111,8 +111,8 @@ public final class SessionInitializer extends AbstractSessionOperations {
return this;
}
public SessionInitializer setConflictingUnitOfWorkException(Class<? extends Exception> e) {
this.conflictingUnitOfWorkClass = e;
public SessionInitializer setUnitOfWorkClass(Class<? extends UnitOfWork> e) {
this.unitOfWorkClass = e;
return this;
}
@ -241,7 +241,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
executor,
autoDdl == AutoDdl.CREATE_DROP,
consistencyLevel,
conflictingUnitOfWorkClass,
unitOfWorkClass,
metricRegistry,
zipkinTracer);
}

View file

@ -1,166 +1,58 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core;
import com.diffplug.common.base.Errors;
import com.google.common.collect.TreeTraverser;
import net.helenus.support.HelenusException;
import net.helenus.support.Either;
import java.util.*;
import java.util.Map;
import java.util.Set;
public interface UnitOfWork<E extends Exception> extends AutoCloseable {
/**
* Marks the beginning of a transactional section of work. Will write a record to the shared
* write-ahead log.
*
* @return the handle used to commit or abort the work.
*/
UnitOfWork begin();
UnitOfWork addNestedUnitOfWork(UnitOfWork uow);
/**
* Checks to see if the work performed between calling begin and now can be committed or not.
*
* @return a function from which to chain work that only happens when commit is successful
* @throws E when the work overlaps with other concurrent writers.
*/
PostCommitFunction<Void, Void> commit() throws E;
/**
* Explicitly abort the work within this unit of work. Any nested aborted unit of work
* will trigger the entire unit of work to commit.
*/
void abort();
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public class UnitOfWork<T extends Exception> implements AutoCloseable {
protected static Class<? extends Exception>conflictExceptionClass = ConflictingUnitOfWorkException.class;
private final List<UnitOfWork> nested = new ArrayList<>();
private final HelenusSession session;
private final UnitOfWork parent;
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
private final Map<String, Set<Object>> cache = new HashMap<String, Set<Object>>();
private boolean aborted = false;
private boolean committed = false;
boolean hasAborted();
protected UnitOfWork(HelenusSession session, UnitOfWork parent) {
Objects.requireNonNull(session, "containing session cannot be null");
boolean hasCommitted();
this.session = session;
this.parent = parent;
}
/**
* Marks the beginning of a transactional section of work. Will write a record to the shared
* write-ahead log.
*
* @return the handle used to commit or abort the work.
*/
protected UnitOfWork begin() {
// log.record(txn::start)
return this;
}
protected UnitOfWork addNestedUnitOfWork(UnitOfWork uow) {
synchronized (nested) {
nested.add(uow);
}
return this;
}
private void applyPostCommitFunctions() {
if (!postCommit.isEmpty()) {
for (CommitThunk f : postCommit) {
f.apply();
}
}
}
public Set<Object> cacheLookup(String key) {
Set<Object> r = getCache().get(key);
if (r != null) {
return r;
} else {
if (parent != null) {
return parent.cacheLookup(key);
}
}
return null;
}
public Map<String, Set<Object>> getCache() { return cache; }
private Iterator<UnitOfWork> getChildNodes() {
return nested.iterator();
}
/**
* Checks to see if the work performed between calling begin and now can be committed or not.
*
* @return a function from which to chain work that only happens when commit is successful
* @throws T when the work overlaps with other concurrent writers.
*/
public PostCommitFunction<Void, Void> commit() throws T {
// All nested UnitOfWork should be committed (not aborted) before calls to commit, check.
boolean canCommit = true;
TreeTraverser<UnitOfWork> traverser = TreeTraverser.using(node -> node::getChildNodes);
for (UnitOfWork uow : traverser.postOrderTraversal(this)) {
if (this != uow) {
canCommit &= (!uow.aborted && uow.committed);
}
}
// log.record(txn::provisionalCommit)
// examine log for conflicts in read-set and write-set between begin and provisional commit
// if (conflict) { throw new ConflictingUnitOfWorkException(this) }
// else return function so as to enable commit.andThen(() -> { do something iff commit was successful; })
if (canCommit) {
committed = true;
aborted = false;
// TODO(gburd): union this cache with parent's (if there is a parent) or with the session cache for all cacheable entities we currently hold
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
// Merge UOW cache into parent's cache.
if (parent != null) {
Map<String, Set<Object>> parentCache = parent.getCache();
for (String key : cache.keySet()) {
if (parentCache.containsKey(key)) {
// merge the sets
Set<Object> ps = parentCache.get(key);
ps.addAll(cache.get(key)); //TODO(gburd): review this, likely not correct in all cases as-is.
} else {
// add the missing set
parentCache.put(key, cache.get(key));
}
}
}
// Apply all post-commit functions for
if (parent == null) {
traverser.postOrderTraversal(this).forEach(uow -> {
uow.applyPostCommitFunctions();
});
return new PostCommitFunction(this, null);
}
}
// else {
// Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass);
// T object = ctor.newInstance(new Object[] { String message });
// }
return new PostCommitFunction(this, postCommit);
}
public void rollback() {
abort();
}
/* Explicitly discard the work and mark it as as such in the log. */
public void abort() {
TreeTraverser<UnitOfWork> traverser = TreeTraverser.using(node -> node::getChildNodes);
traverser.postOrderTraversal(this).forEach(uow -> {
uow.committed = false;
uow.aborted = true;
});
// log.record(txn::abort)
// cache.invalidateSince(txn::start time)
}
public String describeConflicts() {
return "it's complex...";
}
@Override
public void close() throws HelenusException {
// Closing a UnitOfWork will abort iff we've not already aborted or committed this unit of work.
if (aborted == false && committed == false) {
abort();
}
}
public boolean hasAborted() {
return aborted;
}
public boolean hasCommitted() {
return committed;
}
//Either<Object, Set<Object>> cacheLookup(String key);
Set<Object> cacheLookup(String key);
Map<String, Set<Object>> getCache();
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core;
import net.helenus.support.HelenusException;
class UnitOfWorkImpl extends AbstractUnitOfWork<HelenusException> {
@SuppressWarnings("unchecked")
public UnitOfWorkImpl(HelenusSession session, UnitOfWork parent) {
super(session, (AbstractUnitOfWork<HelenusException>) parent);
}
}

View file

@ -49,11 +49,12 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
protected Integer limit = null;
protected boolean allowFiltering = false;
@SuppressWarnings("unchecked")
public SelectOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
this.rowMapper =
new Function<Row, E>() {
new Function<Row, E>() {
@Override
public E apply(Row source) {

View file

@ -34,6 +34,7 @@ public final class UDTColumnValueProvider implements ColumnValueProvider {
}
@Override
@SuppressWarnings("unchecked")
public <V> V getColumnValue(Object sourceObj, int columnIndexUnused, HelenusProperty property) {
UDTValue source = (UDTValue) sourceObj;