Not-yet-tested tracking of UOW deletes
This commit is contained in:
parent
1f4c2154e2
commit
b27bc7d9a9
3 changed files with 31 additions and 34 deletions
|
@ -4,18 +4,17 @@ import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
public class PostCommitFunction<T, R> implements java.util.function.Function<T, R> {
|
public class PostCommitFunction<T, R> implements java.util.function.Function<T, R> {
|
||||||
|
public static final PostCommitFunction<Void, Void> NULL_ABORT = new PostCommitFunction<>(null, null, false);
|
||||||
|
public static final PostCommitFunction<Void, Void> NULL_COMMIT = new PostCommitFunction<>(null, null, true);
|
||||||
|
|
||||||
private final UnitOfWork uow;
|
|
||||||
private final List<CommitThunk> commitThunks;
|
private final List<CommitThunk> commitThunks;
|
||||||
private final List<CommitThunk> abortThunks;
|
private final List<CommitThunk> abortThunks;
|
||||||
private boolean committed;
|
private boolean committed;
|
||||||
|
|
||||||
PostCommitFunction(
|
PostCommitFunction(
|
||||||
UnitOfWork uow,
|
|
||||||
List<CommitThunk> postCommit,
|
List<CommitThunk> postCommit,
|
||||||
List<CommitThunk> abortThunks,
|
List<CommitThunk> abortThunks,
|
||||||
boolean committed) {
|
boolean committed) {
|
||||||
this.uow = uow;
|
|
||||||
this.commitThunks = postCommit;
|
this.commitThunks = postCommit;
|
||||||
this.abortThunks = abortThunks;
|
this.abortThunks = abortThunks;
|
||||||
this.committed = committed;
|
this.committed = committed;
|
||||||
|
|
|
@ -26,8 +26,6 @@ import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.regex.Matcher;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import javax.cache.Cache;
|
import javax.cache.Cache;
|
||||||
import javax.cache.CacheManager;
|
import javax.cache.CacheManager;
|
||||||
|
@ -48,9 +46,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
|
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
|
||||||
public class UnitOfWork implements AutoCloseable {
|
public class UnitOfWork implements AutoCloseable {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(UnitOfWork.class);
|
private static final Logger LOG = LoggerFactory.getLogger(UnitOfWork.class);
|
||||||
private static final Pattern classNameRegex = Pattern.compile("^(?:\\w+\\.)+(?:(\\w+)|(\\w+)\\$.*)$");
|
|
||||||
|
|
||||||
public final UnitOfWork parent;
|
public final UnitOfWork parent;
|
||||||
private final List<UnitOfWork> nested = new ArrayList<>();
|
private final List<UnitOfWork> nested = new ArrayList<>();
|
||||||
|
@ -65,7 +61,7 @@ public class UnitOfWork implements AutoCloseable {
|
||||||
protected int databaseLookups = 0;
|
protected int databaseLookups = 0;
|
||||||
protected final Stopwatch elapsedTime;
|
protected final Stopwatch elapsedTime;
|
||||||
protected Map<String, Double> databaseTime = new HashMap<>();
|
protected Map<String, Double> databaseTime = new HashMap<>();
|
||||||
protected double cacheLookupTimeMSecs = 0.0;
|
protected double cacheLookupTimeMSecs = 0.0d;
|
||||||
private List<CommitThunk> commitThunks = new ArrayList<CommitThunk>();
|
private List<CommitThunk> commitThunks = new ArrayList<CommitThunk>();
|
||||||
private List<CommitThunk> abortThunks = new ArrayList<CommitThunk>();
|
private List<CommitThunk> abortThunks = new ArrayList<CommitThunk>();
|
||||||
private List<CompletableFuture<?>> asyncOperationFutures = new ArrayList<CompletableFuture<?>>();
|
private List<CompletableFuture<?>> asyncOperationFutures = new ArrayList<CompletableFuture<?>>();
|
||||||
|
@ -74,17 +70,6 @@ public class UnitOfWork implements AutoCloseable {
|
||||||
private long committedAt = 0L;
|
private long committedAt = 0L;
|
||||||
private BatchOperation batch;
|
private BatchOperation batch;
|
||||||
|
|
||||||
private String extractClassNameFromStackFrame(String classNameOnStack) {
|
|
||||||
String name = null;
|
|
||||||
Matcher m = classNameRegex.matcher(classNameOnStack);
|
|
||||||
if (m.find()) {
|
|
||||||
name = (m.group(1) != null) ? m.group(1) : ((m.group(2) != null) ? m.group(2) : name);
|
|
||||||
} else {
|
|
||||||
name = classNameOnStack;
|
|
||||||
}
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
|
|
||||||
public UnitOfWork(HelenusSession session) {
|
public UnitOfWork(HelenusSession session) {
|
||||||
this(session, null);
|
this(session, null);
|
||||||
}
|
}
|
||||||
|
@ -97,7 +82,7 @@ public class UnitOfWork implements AutoCloseable {
|
||||||
parent.addNestedUnitOfWork(this);
|
parent.addNestedUnitOfWork(this);
|
||||||
}
|
}
|
||||||
this.session = session;
|
this.session = session;
|
||||||
CacheLoader cacheLoader = null;
|
CacheLoader<String, Object> cacheLoader = null;
|
||||||
if (parent != null) {
|
if (parent != null) {
|
||||||
cacheLoader =
|
cacheLoader =
|
||||||
new CacheLoader<String, Object>() {
|
new CacheLoader<String, Object>() {
|
||||||
|
@ -363,7 +348,7 @@ public class UnitOfWork implements AutoCloseable {
|
||||||
throws HelenusException, TimeoutException {
|
throws HelenusException, TimeoutException {
|
||||||
|
|
||||||
if (isDone()) {
|
if (isDone()) {
|
||||||
return new PostCommitFunction(this, null, null, false);
|
return PostCommitFunction.NULL_ABORT;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only the outer-most UOW batches statements for commit time, execute them.
|
// Only the outer-most UOW batches statements for commit time, execute them.
|
||||||
|
@ -399,7 +384,7 @@ public class UnitOfWork implements AutoCloseable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new PostCommitFunction(this, null, null, false);
|
return PostCommitFunction.NULL_ABORT;
|
||||||
} else {
|
} else {
|
||||||
committed = true;
|
committed = true;
|
||||||
aborted = false;
|
aborted = false;
|
||||||
|
@ -453,11 +438,11 @@ public class UnitOfWork implements AutoCloseable {
|
||||||
LOG.info(logTimers("committed"));
|
LOG.info(logTimers("committed"));
|
||||||
}
|
}
|
||||||
|
|
||||||
return new PostCommitFunction(this, null, null, true);
|
return PostCommitFunction.NULL_COMMIT;
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
// Merge cache and statistics into parent if there is one.
|
// Merge cache and statistics into parent if there is one.
|
||||||
parent.statementCache.putAll(statementCache.<Map>unwrap(Map.class));
|
parent.statementCache.putAll(statementCache.<Map>unwrap(Map.class));
|
||||||
|
parent.statementCache.removeAll(statementCache.getDeletions());
|
||||||
parent.mergeCache(cache);
|
parent.mergeCache(cache);
|
||||||
parent.addBatched(batch);
|
parent.addBatched(batch);
|
||||||
if (purpose != null) {
|
if (purpose != null) {
|
||||||
|
@ -483,15 +468,15 @@ public class UnitOfWork implements AutoCloseable {
|
||||||
// Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass);
|
// Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass);
|
||||||
// T object = ctor.newInstance(new Object[] { String message });
|
// T object = ctor.newInstance(new Object[] { String message });
|
||||||
// }
|
// }
|
||||||
return new PostCommitFunction(this, commitThunks, abortThunks, true);
|
return new PostCommitFunction<Void, Void>(commitThunks, abortThunks, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addBatched(BatchOperation batch) {
|
private void addBatched(BatchOperation batchArg) {
|
||||||
if (batch != null) {
|
if (batchArg != null) {
|
||||||
if (this.batch == null) {
|
if (this.batch == null) {
|
||||||
this.batch = batch;
|
this.batch = batchArg;
|
||||||
} else {
|
} else {
|
||||||
this.batch.addAll(batch);
|
this.batch.addAll(batchArg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package net.helenus.core.cache;
|
package net.helenus.core.cache;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -22,13 +22,14 @@ import javax.cache.processor.MutableEntry;
|
||||||
public class MapCache<K, V> implements Cache<K, V> {
|
public class MapCache<K, V> implements Cache<K, V> {
|
||||||
private final CacheManager manager;
|
private final CacheManager manager;
|
||||||
private final String name;
|
private final String name;
|
||||||
private Map<K, V> map = new ConcurrentHashMap<K, V>();
|
private Map<K, V> map = new ConcurrentHashMap<>();
|
||||||
private Set<CacheEntryRemovedListener> cacheEntryRemovedListeners = new HashSet<>();
|
private Set<K> deletes = Collections.synchronizedSet(new HashSet<>());
|
||||||
|
private Set<CacheEntryRemovedListener<K, V>> cacheEntryRemovedListeners = new HashSet<>();
|
||||||
private CacheLoader<K, V> cacheLoader = null;
|
private CacheLoader<K, V> cacheLoader = null;
|
||||||
private boolean isReadThrough = false;
|
private boolean isReadThrough = false;
|
||||||
private Configuration<K, V> configuration = new MapConfiguration<K, V>();
|
|
||||||
|
|
||||||
private static class MapConfiguration<K, V> implements Configuration<K, V> {
|
private static class MapConfiguration<K, V> implements Configuration<K, V> {
|
||||||
|
private static final long serialVersionUID = 6093947542772516209L;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Class<K> getKeyType() {
|
public Class<K> getKeyType() {
|
||||||
|
@ -54,6 +55,11 @@ public class MapCache<K, V> implements Cache<K, V> {
|
||||||
this.isReadThrough = isReadThrough;
|
this.isReadThrough = isReadThrough;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Non-interface method; should only be called by UnitOfWork when merging to an enclosing UnitOfWork. */
|
||||||
|
public Set<K> getDeletions() {
|
||||||
|
return new HashSet<>(deletes);
|
||||||
|
}
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
/** {@inheritDoc} */
|
||||||
@Override
|
@Override
|
||||||
public V get(K key) {
|
public V get(K key) {
|
||||||
|
@ -193,6 +199,7 @@ public class MapCache<K, V> implements Cache<K, V> {
|
||||||
boolean removed = false;
|
boolean removed = false;
|
||||||
synchronized (map) {
|
synchronized (map) {
|
||||||
removed = map.remove(key) != null;
|
removed = map.remove(key) != null;
|
||||||
|
deletes.add(key);
|
||||||
notifyRemovedListeners(key);
|
notifyRemovedListeners(key);
|
||||||
}
|
}
|
||||||
return removed;
|
return removed;
|
||||||
|
@ -205,6 +212,7 @@ public class MapCache<K, V> implements Cache<K, V> {
|
||||||
V value = map.get(key);
|
V value = map.get(key);
|
||||||
if (value != null && oldValue.equals(value)) {
|
if (value != null && oldValue.equals(value)) {
|
||||||
map.remove(key);
|
map.remove(key);
|
||||||
|
deletes.add(key);
|
||||||
notifyRemovedListeners(key);
|
notifyRemovedListeners(key);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -219,6 +227,7 @@ public class MapCache<K, V> implements Cache<K, V> {
|
||||||
V oldValue = null;
|
V oldValue = null;
|
||||||
oldValue = map.get(key);
|
oldValue = map.get(key);
|
||||||
map.remove(key);
|
map.remove(key);
|
||||||
|
deletes.add(key);
|
||||||
notifyRemovedListeners(key);
|
notifyRemovedListeners(key);
|
||||||
return oldValue;
|
return oldValue;
|
||||||
}
|
}
|
||||||
|
@ -273,6 +282,7 @@ public class MapCache<K, V> implements Cache<K, V> {
|
||||||
keys.remove(key);
|
keys.remove(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
deletes.addAll(keys);
|
||||||
}
|
}
|
||||||
notifyRemovedListeners(keys);
|
notifyRemovedListeners(keys);
|
||||||
}
|
}
|
||||||
|
@ -283,6 +293,7 @@ public class MapCache<K, V> implements Cache<K, V> {
|
||||||
synchronized (map) {
|
synchronized (map) {
|
||||||
Set<K> keys = map.keySet();
|
Set<K> keys = map.keySet();
|
||||||
map.clear();
|
map.clear();
|
||||||
|
deletes.addAll(keys);
|
||||||
notifyRemovedListeners(keys);
|
notifyRemovedListeners(keys);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -291,6 +302,7 @@ public class MapCache<K, V> implements Cache<K, V> {
|
||||||
@Override
|
@Override
|
||||||
public void clear() {
|
public void clear() {
|
||||||
map.clear();
|
map.clear();
|
||||||
|
deletes.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
/** {@inheritDoc} */
|
||||||
|
@ -386,6 +398,7 @@ public class MapCache<K, V> implements Cache<K, V> {
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
/** {@inheritDoc} */
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public <T> T unwrap(Class<T> clazz) {
|
public <T> T unwrap(Class<T> clazz) {
|
||||||
return (T) map;
|
return (T) map;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue