Moving toward javax.cache.Cache-based UOW cache API.

This commit is contained in:
Greg Burd 2018-02-09 21:55:23 -05:00
parent 76b603f3d3
commit b023ec359b
4 changed files with 427 additions and 81 deletions

View file

@ -32,6 +32,7 @@ import javax.cache.Cache;
import javax.cache.CacheManager; import javax.cache.CacheManager;
import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet; import net.helenus.core.cache.Facet;
import net.helenus.core.cache.MapCache;
import net.helenus.core.operation.AbstractOperation; import net.helenus.core.operation.AbstractOperation;
import net.helenus.core.operation.BatchOperation; import net.helenus.core.operation.BatchOperation;
import net.helenus.mapping.MappingUtil; import net.helenus.mapping.MappingUtil;
@ -50,9 +51,9 @@ public abstract class AbstractUnitOfWork<E extends Exception>
private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>(); private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>();
private final HelenusSession session; private final HelenusSession session;
private final AbstractUnitOfWork<E> parent; public final AbstractUnitOfWork<E> parent;
private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create(); private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create();
private final Map<String, Object> statementCache = new ConcurrentHashMap<String, Object>(); private final MapCache<String, Object> statementCache = new MapCache<String, Object>(null, "UOW(" + hashCode() + ")", this);
protected String purpose; protected String purpose;
protected List<String> nestedPurposes = new ArrayList<String>(); protected List<String> nestedPurposes = new ArrayList<String>();
protected String info; protected String info;
@ -205,19 +206,6 @@ public abstract class AbstractUnitOfWork<E extends Exception>
} }
} }
@Override
public Optional<Object> cacheLookup(String key) {
AbstractUnitOfWork self = this;
do {
Object result = self.statementCache.get(key);
if (result != null) {
return result == deleted ? Optional.ofNullable(null) : Optional.of(result);
}
self = self.parent;
} while (self != null);
return Optional.empty();
}
@Override @Override
public Optional<Object> cacheLookup(List<Facet> facets) { public Optional<Object> cacheLookup(List<Facet> facets) {
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
@ -258,16 +246,6 @@ public abstract class AbstractUnitOfWork<E extends Exception>
return result; return result;
} }
@Override
public void cacheEvict(String key) {
statementCache.remove(key);
}
@Override
public void cacheDelete(String key) {
statementCache.put(key, deleted);
}
@Override @Override
public List<Facet> cacheEvict(List<Facet> facets) { public List<Facet> cacheEvict(List<Facet> facets) {
Either<Object, List<Facet>> deletedObjectFacets = Either.right(facets); Either<Object, List<Facet>> deletedObjectFacets = Either.right(facets);
@ -305,8 +283,8 @@ public abstract class AbstractUnitOfWork<E extends Exception>
} }
@Override @Override
public Object cacheUpdate(String key, Object value) { public Cache<String, Object> getCache() {
return statementCache.put(key, value); return statementCache;
} }
@Override @Override
@ -376,10 +354,10 @@ public abstract class AbstractUnitOfWork<E extends Exception>
applyPostCommitFunctions("aborted", abortThunks); applyPostCommitFunctions("aborted", abortThunks);
}); });
elapsedTime.stop(); elapsedTime.stop();
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
LOG.info(logTimers("aborted")); LOG.info(logTimers("aborted"));
} }
} }
return new PostCommitFunction(this, null, null, false); return new PostCommitFunction(this, null, null, false);
@ -400,12 +378,12 @@ public abstract class AbstractUnitOfWork<E extends Exception>
// Merge our statement cache into the session cache if it exists. // Merge our statement cache into the session cache if it exists.
CacheManager cacheManager = session.getCacheManager(); CacheManager cacheManager = session.getCacheManager();
if (cacheManager != null) { if (cacheManager != null) {
for (Map.Entry<String, Object> entry : statementCache.entrySet()) { for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>)statementCache.<Map>unwrap(Map.class).entrySet()) {
String[] keyParts = entry.getKey().split("\\."); String[] keyParts = entry.getKey().split("\\.");
if (keyParts.length == 2) { if (keyParts.length == 2) {
String cacheName = keyParts[0]; String cacheName = keyParts[0];
String key = keyParts[1]; String key = keyParts[1];
if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) { if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) {
Cache<Object, Object> cache = cacheManager.getCache(cacheName); Cache<Object, Object> cache = cacheManager.getCache(cacheName);
if (cache != null) { if (cache != null) {
Object value = entry.getValue(); Object value = entry.getValue();
@ -439,7 +417,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
} else { } else {
// Merge cache and statistics into parent if there is one. // Merge cache and statistics into parent if there is one.
parent.statementCache.putAll(statementCache); parent.statementCache.putAll(statementCache.<Map>unwrap(Map.class));
parent.mergeCache(cache); parent.mergeCache(cache);
parent.addBatched(batch); parent.addBatched(batch);
if (purpose != null) { if (purpose != null) {
@ -498,6 +476,13 @@ public abstract class AbstractUnitOfWork<E extends Exception>
uow.abortThunks.clear(); uow.abortThunks.clear();
}); });
if (parent == null) {
elapsedTime.stop();
if (LOG.isInfoEnabled()) {
LOG.info(logTimers("aborted"));
}
}
// TODO(gburd): when we integrate the transaction support we'll need to... // TODO(gburd): when we integrate the transaction support we'll need to...
// log.record(txn::abort) // log.record(txn::abort)
// cache.invalidateSince(txn::start time) // cache.invalidateSince(txn::start time)

View file

@ -20,6 +20,9 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import javax.cache.Cache;
import net.helenus.core.cache.Facet; import net.helenus.core.cache.Facet;
import net.helenus.core.operation.AbstractOperation; import net.helenus.core.operation.AbstractOperation;
@ -61,20 +64,14 @@ public interface UnitOfWork<X extends Exception> extends AutoCloseable {
void addFuture(CompletableFuture<?> future); void addFuture(CompletableFuture<?> future);
Optional<Object> cacheLookup(String key);
Optional<Object> cacheLookup(List<Facet> facets); Optional<Object> cacheLookup(List<Facet> facets);
Object cacheUpdate(String key, Object value); Cache<String, Object> getCache();
Object cacheUpdate(Object pojo, List<Facet> facets); Object cacheUpdate(Object pojo, List<Facet> facets);
void cacheEvict(String key);
List<Facet> cacheEvict(List<Facet> facets); List<Facet> cacheEvict(List<Facet> facets);
public void cacheDelete(String key);
String getPurpose(); String getPurpose();
UnitOfWork setPurpose(String purpose); UnitOfWork setPurpose(String purpose);

View file

@ -1,155 +1,519 @@
package net.helenus.core.cache; package net.helenus.core.cache;
import static net.helenus.core.HelenusSession.deleted;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.cache.Cache; import javax.cache.Cache;
import javax.cache.CacheManager; import javax.cache.CacheManager;
import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Configuration; import javax.cache.configuration.Configuration;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CompletionListener; import javax.cache.integration.CompletionListener;
import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException; import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import net.helenus.core.AbstractUnitOfWork;
import net.helenus.core.UnitOfWork;
public class MapCache<K, V> implements Cache<K, V> { public class MapCache<K, V> implements Cache<K, V> {
private final CacheManager manager;
private final String name;
private final UnitOfWork uow;
private Map<K, V> map = new ConcurrentHashMap<K, V>();
private Set<CacheEntryRemovedListener> cacheEntryRemovedListeners = new HashSet<>();
private CacheLoader<K, V> cacheLoader = null;
private boolean isReadThrough = false;
private Configuration<K, V> configuration = new MapConfiguration<K, V>();
private Map<K, V> map = new HashMap<K, V>(); private static class MapConfiguration<K, V> implements Configuration<K, V> {
@Override public Class<K> getKeyType() {
return null;
}
@Override public Class<V> getValueType() {
return null;
}
@Override public boolean isStoreByValue() {
return false;
}
}
public MapCache(CacheManager manager, String name, UnitOfWork uow) {
this.manager = manager;
this.name = name;
this.uow = uow;
}
private V map_get(K key) {
V value = null;
AbstractUnitOfWork uow = (AbstractUnitOfWork)this.uow;
do {
V result = (V) uow.getCache().get(key);
if (result != null) {
return result == deleted ? null : result;
}
uow = uow.parent;
} while (uow != null);
return null;
}
/**
* {@inheritDoc}
*/
@Override @Override
public V get(K key) { public V get(K key) {
return map.get(key); V value = null;
synchronized (map) {
value = map_get(key);
if (value == null && isReadThrough && cacheLoader != null) {
V loadedValue = cacheLoader.load(key);
if (loadedValue != null) {
map.put(key, value);
value = loadedValue;
}
}
}
return value;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public Map<K, V> getAll(Set<? extends K> keys) { public Map<K, V> getAll(Set<? extends K> keys) {
Map<K, V> result = new HashMap<K, V>(keys.size()); Map<K, V> result = null;
for (K key : keys) { synchronized (map) {
V value = map.get(key); result = new HashMap<K, V>(keys.size());
if (value != null) { for (K key : keys) {
result.put(key, value); V value = map_get(key);
if (value != null) {
result.put(key, value);
keys.remove(key);
}
}
if (isReadThrough && cacheLoader != null) {
for (K key : keys) {
Map<K, V> loadedValues = cacheLoader.loadAll(keys);
for (Map.Entry<K, V> entry : loadedValues.entrySet()) {
V v = entry.getValue();
if (v != null) {
K k = entry.getKey();
map.put(k, v);
result.put(k, v);
}
}
}
}
} }
} return result;
return result;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public boolean containsKey(K key) { public boolean containsKey(K key) {
return map.containsKey(key); return map.containsKey(key);
} }
/**
* {@inheritDoc}
*/
@Override @Override
public void loadAll( public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionListener) {
Set<? extends K> keys, if (cacheLoader != null) {
boolean replaceExistingValues, try {
CompletionListener completionListener) {} synchronized (map) {
Map<K, V> loadedValues = cacheLoader.loadAll(keys);
for (Map.Entry<K, V> entry : loadedValues.entrySet()) {
V value = entry.getValue();
K key = entry.getKey();
if (value != null) {
boolean existsCurrently = map.containsKey(key);
if (!existsCurrently || replaceExistingValues) {
map.put(key, value);
keys.remove(key);
}
}
}
}
} catch (Exception e) {
if (completionListener != null) {
completionListener.onException(e);
}
}
}
if (completionListener != null) {
if (keys.isEmpty()) {
completionListener.onCompletion();
}
}
}
/**
* {@inheritDoc}
*/
@Override @Override
public void put(K key, V value) {} public void put(K key, V value) {
map.put(key, value);
}
/**
* {@inheritDoc}
*/
@Override @Override
public V getAndPut(K key, V value) { public V getAndPut(K key, V value) {
return null; V result = null;
synchronized (map) {
result = map_get(key);
if (value == null && isReadThrough && cacheLoader != null) {
V loadedValue = cacheLoader.load(key);
if (loadedValue != null) {
map.put(key, value);
value = loadedValue;
}
}
map.put(key, value);
}
return result;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public void putAll(Map<? extends K, ? extends V> map) {} public void putAll(Map<? extends K, ? extends V> map) {
synchronized (map) {
for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
this.map.put(entry.getKey(), entry.getValue());
}
}
}
/**
* {@inheritDoc}
*/
@Override @Override
public boolean putIfAbsent(K key, V value) { public boolean putIfAbsent(K key, V value) {
return false; synchronized (map) {
} if (!map.containsKey(key)) {
map.put(key, value);
return true;
} else {
return false;
}
}
}
/**
* {@inheritDoc}
*/
@Override @Override
public boolean remove(K key) { public boolean remove(K key) {
return false; boolean removed = false;
synchronized (map) {
removed = map.remove(key) != null;
notifyRemovedListeners(key);
}
return removed;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public boolean remove(K key, V oldValue) { public boolean remove(K key, V oldValue) {
return false; synchronized (map) {
V value = map.get(key);
if (value != null && oldValue.equals(value)) {
map.remove(key);
notifyRemovedListeners(key);
return true;
}
}
return false;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public V getAndRemove(K key) { public V getAndRemove(K key) {
return null; synchronized (map) {
V oldValue = null;
oldValue = map.get(key);
map.remove(key);
notifyRemovedListeners(key);
return oldValue;
}
} }
/**
* {@inheritDoc}
*/
@Override @Override
public boolean replace(K key, V oldValue, V newValue) { public boolean replace(K key, V oldValue, V newValue) {
return false; synchronized (map) {
V value = map.get(key);
if (value != null && oldValue.equals(value)) {
map.put(key, newValue);
return true;
}
}
return false;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public boolean replace(K key, V value) { public boolean replace(K key, V value) {
return false; synchronized (map) {
if (map.containsKey(key)) {
map.put(key, value);
return true;
}
}
return false;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public V getAndReplace(K key, V value) { public V getAndReplace(K key, V value) {
return null; synchronized (map) {
V oldValue = map.get(key);
if (value != null && value.equals(oldValue)) {
map.put(key, value);
return oldValue;
}
}
return null;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public void removeAll(Set<? extends K> keys) {} public void removeAll(Set<? extends K> keys) {
synchronized (map) {
for (K key : keys) {
if (map.containsKey(key)) {
map.remove(key);
} else {
keys.remove(key);
}
}
}
notifyRemovedListeners(keys);
}
/**
* {@inheritDoc}
*/
@Override @Override
public void removeAll() {} public void removeAll() {
synchronized (map) {
Set<K> keys = map.keySet();
map.clear();
notifyRemovedListeners(keys);
}
}
/**
* {@inheritDoc}
*/
@Override @Override
public void clear() {} public void clear() {
map.clear();
}
/**
* {@inheritDoc}
*/
@Override @Override
public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
return null; if (!MapConfiguration.class.isAssignableFrom(clazz)) {
throw new IllegalArgumentException();
}
return null;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments) public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments)
throws EntryProcessorException { throws EntryProcessorException {
return null; return null;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public <T> Map<K, EntryProcessorResult<T>> invokeAll( public <T> Map<K, EntryProcessorResult<T>> invokeAll(
Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... arguments) { Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... arguments) {
synchronized (map) {
for (K key : keys) {
V value = map.get(key);
if (value != null) {
entryProcessor.process(new MutableEntry<K, V>() {
@Override public boolean exists() {
return map.containsKey(key);
}
@Override public void remove() {
synchronized (map) {
V value = map.get(key);
if (value != null) {
map.remove(key);
notifyRemovedListeners(key);
}
}
}
@Override public K getKey() {
return key;
}
@Override public V getValue() {
return map.get(value);
}
@Override public <T> T unwrap(Class<T> clazz) {
return null;
}
@Override public void setValue(V value) {
map.put(key, value);
}
}, arguments);
}
}
}
return null; return null;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public String getName() { public String getName() {
return null; return name;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public CacheManager getCacheManager() { public CacheManager getCacheManager() {
return null; return manager;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public void close() {} public void close() {
}
/**
* {@inheritDoc}
*/
@Override @Override
public boolean isClosed() { public boolean isClosed() {
return false; return false;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public <T> T unwrap(Class<T> clazz) { public <T> T unwrap(Class<T> clazz) {
return null; return (T) map;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public void registerCacheEntryListener( public void registerCacheEntryListener(
CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {} CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
//cacheEntryRemovedListeners.add(cacheEntryListenerConfiguration.getCacheEntryListenerFactory().create());
}
/**
* {@inheritDoc}
*/
@Override @Override
public void deregisterCacheEntryListener( public void deregisterCacheEntryListener(
CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {} CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {}
/**
* {@inheritDoc}
*/
@Override @Override
public Iterator<Entry<K, V>> iterator() { public Iterator<Entry<K, V>> iterator() {
return null; synchronized (map) {
return new Iterator<Entry<K, V>>() {
Iterator<Map.Entry<K, V>> entries = map.entrySet().iterator();
@Override
public boolean hasNext() {
return entries.hasNext();
}
@Override
public Entry<K, V> next() {
Map.Entry<K, V> entry = entries.next();
return new Entry<K, V>() {
K key = entry.getKey();
V value = entry.getValue();
@Override public K getKey() {
return key;
}
@Override public V getValue() {
return value;
}
@Override public <T> T unwrap(Class<T> clazz) {
return null;
}
};
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
} }
private void notifyRemovedListeners(K key) {
// if (cacheEntryRemovedListeners != null) {
// cacheEntryRemovedListeners.forEach(listener -> listener.onRemoved())
// }
}
private void notifyRemovedListeners(Set<? extends K> keys) {
}
} }

View file

@ -348,11 +348,11 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null); session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null);
String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key.toString(); String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key.toString();
uow.cacheUpdate(cacheKey, w1); uow.getCache().put(cacheKey, w1);
// This should remove the object from the cache. // This should remove the object from the cache.
session.delete(widget).where(widget::id, eq(key)).sync(uow); session.delete(widget).where(widget::id, eq(key)).sync(uow);
uow.cacheDelete(cacheKey); uow.getCache().remove(cacheKey);
// This should fail to read from the cache. // This should fail to read from the cache.
w3 = w3 =
@ -473,7 +473,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.sync(uow); .sync(uow);
String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key1.toString(); String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key1.toString();
uow.cacheUpdate(cacheKey, w1); uow.getCache().put(cacheKey, w1);
/* /*
w2 = session.<Widget>upsert(w1) w2 = session.<Widget>upsert(w1)
.value(widget::a, RandomString.make(10)) .value(widget::a, RandomString.make(10))
@ -507,11 +507,11 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.sync(uow); .sync(uow);
String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key.toString(); String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key.toString();
uow.cacheUpdate(cacheKey, w1); uow.getCache().put(cacheKey, w1);
// This should read from the cache and get the same instance of a Widget. // This should read from the cache and get the same instance of a Widget.
w2 = w2 =
session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null); session.<Widget>select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null);
uow.cacheUpdate(cacheKey, w1); uow.getCache().put(cacheKey, w1);
uow.commit() uow.commit()
.andThen( .andThen(