Formatting and fixes to use MapCache in the UOW.

This commit is contained in:
Greg Burd 2018-02-10 12:32:54 -05:00
parent b023ec359b
commit af4156079d
4 changed files with 363 additions and 390 deletions

View file

@ -24,12 +24,13 @@ import com.google.common.collect.TreeTraverser;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.cache.Cache; import javax.cache.Cache;
import javax.cache.CacheManager; import javax.cache.CacheManager;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet; import net.helenus.core.cache.Facet;
import net.helenus.core.cache.MapCache; import net.helenus.core.cache.MapCache;
@ -53,7 +54,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
private final HelenusSession session; private final HelenusSession session;
public final AbstractUnitOfWork<E> parent; public final AbstractUnitOfWork<E> parent;
private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create(); private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create();
private final MapCache<String, Object> statementCache = new MapCache<String, Object>(null, "UOW(" + hashCode() + ")", this); private final MapCache<String, Object> statementCache;
protected String purpose; protected String purpose;
protected List<String> nestedPurposes = new ArrayList<String>(); protected List<String> nestedPurposes = new ArrayList<String>();
protected String info; protected String info;
@ -76,6 +77,31 @@ public abstract class AbstractUnitOfWork<E extends Exception>
this.session = session; this.session = session;
this.parent = parent; this.parent = parent;
CacheLoader cacheLoader = null;
if (parent != null) {
cacheLoader =
new CacheLoader<String, Object>() {
Cache<String, Object> cache = parent.getCache();
@Override
public Object load(String key) throws CacheLoaderException {
return cache.get(key);
}
@Override
public Map<String, Object> loadAll(Iterable<? extends String> keys)
throws CacheLoaderException {
Map<String, Object> kvp = new HashMap<String, Object>();
for (String key : keys) {
kvp.put(key, cache.get(key));
}
return kvp;
}
};
}
this.statementCache =
new MapCache<String, Object>(null, "UOW(" + hashCode() + ")", cacheLoader, true);
} }
@Override @Override
@ -378,7 +404,8 @@ public abstract class AbstractUnitOfWork<E extends Exception>
// Merge our statement cache into the session cache if it exists. // Merge our statement cache into the session cache if it exists.
CacheManager cacheManager = session.getCacheManager(); CacheManager cacheManager = session.getCacheManager();
if (cacheManager != null) { if (cacheManager != null) {
for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>)statementCache.<Map>unwrap(Map.class).entrySet()) { for (Map.Entry<String, Object> entry :
(Set<Map.Entry<String, Object>>) statementCache.<Map>unwrap(Map.class).entrySet()) {
String[] keyParts = entry.getKey().split("\\."); String[] keyParts = entry.getKey().split("\\.");
if (keyParts.length == 2) { if (keyParts.length == 2) {
String cacheName = keyParts[0]; String cacheName = keyParts[0];

View file

@ -20,9 +20,7 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import javax.cache.Cache; import javax.cache.Cache;
import net.helenus.core.cache.Facet; import net.helenus.core.cache.Facet;
import net.helenus.core.operation.AbstractOperation; import net.helenus.core.operation.AbstractOperation;

View file

@ -1,6 +1,5 @@
package net.helenus.core.cache; package net.helenus.core.cache;
import static net.helenus.core.HelenusSession.deleted;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -8,7 +7,6 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.cache.Cache; import javax.cache.Cache;
import javax.cache.CacheManager; import javax.cache.CacheManager;
import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.CacheEntryListenerConfiguration;
@ -21,13 +19,9 @@ import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry; import javax.cache.processor.MutableEntry;
import net.helenus.core.AbstractUnitOfWork;
import net.helenus.core.UnitOfWork;
public class MapCache<K, V> implements Cache<K, V> { public class MapCache<K, V> implements Cache<K, V> {
private final CacheManager manager; private final CacheManager manager;
private final String name; private final String name;
private final UnitOfWork uow;
private Map<K, V> map = new ConcurrentHashMap<K, V>(); private Map<K, V> map = new ConcurrentHashMap<K, V>();
private Set<CacheEntryRemovedListener> cacheEntryRemovedListeners = new HashSet<>(); private Set<CacheEntryRemovedListener> cacheEntryRemovedListeners = new HashSet<>();
private CacheLoader<K, V> cacheLoader = null; private CacheLoader<K, V> cacheLoader = null;
@ -36,50 +30,40 @@ public class MapCache<K, V> implements Cache<K, V> {
private static class MapConfiguration<K, V> implements Configuration<K, V> { private static class MapConfiguration<K, V> implements Configuration<K, V> {
@Override public Class<K> getKeyType() { @Override
public Class<K> getKeyType() {
return null; return null;
} }
@Override public Class<V> getValueType() { @Override
public Class<V> getValueType() {
return null; return null;
} }
@Override public boolean isStoreByValue() { @Override
public boolean isStoreByValue() {
return false; return false;
} }
} }
public MapCache(CacheManager manager, String name, UnitOfWork uow) { public MapCache(
CacheManager manager, String name, CacheLoader<K, V> cacheLoader, boolean isReadThrough) {
this.manager = manager; this.manager = manager;
this.name = name; this.name = name;
this.uow = uow; this.cacheLoader = cacheLoader;
this.isReadThrough = isReadThrough;
} }
private V map_get(K key) { /** {@inheritDoc} */
V value = null;
AbstractUnitOfWork uow = (AbstractUnitOfWork)this.uow;
do {
V result = (V) uow.getCache().get(key);
if (result != null) {
return result == deleted ? null : result;
}
uow = uow.parent;
} while (uow != null);
return null;
}
/**
* {@inheritDoc}
*/
@Override @Override
public V get(K key) { public V get(K key) {
V value = null; V value = null;
synchronized (map) { synchronized (map) {
value = map_get(key); value = map.get(key);
if (value == null && isReadThrough && cacheLoader != null) { if (value == null && isReadThrough && cacheLoader != null) {
V loadedValue = cacheLoader.load(key); V loadedValue = cacheLoader.load(key);
if (loadedValue != null) { if (loadedValue != null) {
map.put(key, value); map.put(key, loadedValue);
value = loadedValue; value = loadedValue;
} }
} }
@ -87,16 +71,14 @@ public class MapCache<K, V> implements Cache<K, V> {
return value; return value;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public Map<K, V> getAll(Set<? extends K> keys) { public Map<K, V> getAll(Set<? extends K> keys) {
Map<K, V> result = null; Map<K, V> result = null;
synchronized (map) { synchronized (map) {
result = new HashMap<K, V>(keys.size()); result = new HashMap<K, V>(keys.size());
for (K key : keys) { for (K key : keys) {
V value = map_get(key); V value = map.get(key);
if (value != null) { if (value != null) {
result.put(key, value); result.put(key, value);
keys.remove(key); keys.remove(key);
@ -119,19 +101,16 @@ public class MapCache<K, V> implements Cache<K, V> {
return result; return result;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public boolean containsKey(K key) { public boolean containsKey(K key) {
return map.containsKey(key); return map.containsKey(key);
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionListener) { public void loadAll(
Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionListener) {
if (cacheLoader != null) { if (cacheLoader != null) {
try { try {
synchronized (map) { synchronized (map) {
@ -161,22 +140,18 @@ public class MapCache<K, V> implements Cache<K, V> {
} }
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public void put(K key, V value) { public void put(K key, V value) {
map.put(key, value); map.put(key, value);
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public V getAndPut(K key, V value) { public V getAndPut(K key, V value) {
V result = null; V result = null;
synchronized (map) { synchronized (map) {
result = map_get(key); result = map.get(key);
if (value == null && isReadThrough && cacheLoader != null) { if (value == null && isReadThrough && cacheLoader != null) {
V loadedValue = cacheLoader.load(key); V loadedValue = cacheLoader.load(key);
if (loadedValue != null) { if (loadedValue != null) {
@ -189,9 +164,7 @@ public class MapCache<K, V> implements Cache<K, V> {
return result; return result;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public void putAll(Map<? extends K, ? extends V> map) { public void putAll(Map<? extends K, ? extends V> map) {
synchronized (map) { synchronized (map) {
@ -201,9 +174,7 @@ public class MapCache<K, V> implements Cache<K, V> {
} }
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public boolean putIfAbsent(K key, V value) { public boolean putIfAbsent(K key, V value) {
synchronized (map) { synchronized (map) {
@ -216,9 +187,7 @@ public class MapCache<K, V> implements Cache<K, V> {
} }
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public boolean remove(K key) { public boolean remove(K key) {
boolean removed = false; boolean removed = false;
@ -229,9 +198,7 @@ public class MapCache<K, V> implements Cache<K, V> {
return removed; return removed;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public boolean remove(K key, V oldValue) { public boolean remove(K key, V oldValue) {
synchronized (map) { synchronized (map) {
@ -245,9 +212,7 @@ public class MapCache<K, V> implements Cache<K, V> {
return false; return false;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public V getAndRemove(K key) { public V getAndRemove(K key) {
synchronized (map) { synchronized (map) {
@ -259,9 +224,7 @@ public class MapCache<K, V> implements Cache<K, V> {
} }
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public boolean replace(K key, V oldValue, V newValue) { public boolean replace(K key, V oldValue, V newValue) {
synchronized (map) { synchronized (map) {
@ -274,9 +237,7 @@ public class MapCache<K, V> implements Cache<K, V> {
return false; return false;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public boolean replace(K key, V value) { public boolean replace(K key, V value) {
synchronized (map) { synchronized (map) {
@ -288,9 +249,7 @@ public class MapCache<K, V> implements Cache<K, V> {
return false; return false;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public V getAndReplace(K key, V value) { public V getAndReplace(K key, V value) {
synchronized (map) { synchronized (map) {
@ -303,9 +262,7 @@ public class MapCache<K, V> implements Cache<K, V> {
return null; return null;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public void removeAll(Set<? extends K> keys) { public void removeAll(Set<? extends K> keys) {
synchronized (map) { synchronized (map) {
@ -320,9 +277,7 @@ public class MapCache<K, V> implements Cache<K, V> {
notifyRemovedListeners(keys); notifyRemovedListeners(keys);
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public void removeAll() { public void removeAll() {
synchronized (map) { synchronized (map) {
@ -332,17 +287,13 @@ public class MapCache<K, V> implements Cache<K, V> {
} }
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public void clear() { public void clear() {
map.clear(); map.clear();
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
if (!MapConfiguration.class.isAssignableFrom(clazz)) { if (!MapConfiguration.class.isAssignableFrom(clazz)) {
@ -351,18 +302,14 @@ public class MapCache<K, V> implements Cache<K, V> {
return null; return null;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments) public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments)
throws EntryProcessorException { throws EntryProcessorException {
return null; return null;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public <T> Map<K, EntryProcessorResult<T>> invokeAll( public <T> Map<K, EntryProcessorResult<T>> invokeAll(
Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... arguments) { Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... arguments) {
@ -370,12 +317,15 @@ public class MapCache<K, V> implements Cache<K, V> {
for (K key : keys) { for (K key : keys) {
V value = map.get(key); V value = map.get(key);
if (value != null) { if (value != null) {
entryProcessor.process(new MutableEntry<K, V>() { entryProcessor.process(
@Override public boolean exists() { new MutableEntry<K, V>() {
@Override
public boolean exists() {
return map.containsKey(key); return map.containsKey(key);
} }
@Override public void remove() { @Override
public void remove() {
synchronized (map) { synchronized (map) {
V value = map.get(key); V value = map.get(key);
if (value != null) { if (value != null) {
@ -385,86 +335,74 @@ public class MapCache<K, V> implements Cache<K, V> {
} }
} }
@Override public K getKey() { @Override
public K getKey() {
return key; return key;
} }
@Override public V getValue() { @Override
public V getValue() {
return map.get(value); return map.get(value);
} }
@Override public <T> T unwrap(Class<T> clazz) { @Override
public <T> T unwrap(Class<T> clazz) {
return null; return null;
} }
@Override public void setValue(V value) { @Override
public void setValue(V value) {
map.put(key, value); map.put(key, value);
} }
}, arguments); },
arguments);
} }
} }
} }
return null; return null;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public String getName() { public String getName() {
return name; return name;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public CacheManager getCacheManager() { public CacheManager getCacheManager() {
return manager; return manager;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public void close() { public void close() {}
}
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public boolean isClosed() { public boolean isClosed() {
return false; return false;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public <T> T unwrap(Class<T> clazz) { public <T> T unwrap(Class<T> clazz) {
return (T) map; return (T) map;
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public void registerCacheEntryListener( public void registerCacheEntryListener(
CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) { CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
//cacheEntryRemovedListeners.add(cacheEntryListenerConfiguration.getCacheEntryListenerFactory().create()); //cacheEntryRemovedListeners.add(cacheEntryListenerConfiguration.getCacheEntryListenerFactory().create());
} }
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public void deregisterCacheEntryListener( public void deregisterCacheEntryListener(
CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {} CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {}
/** /** {@inheritDoc} */
* {@inheritDoc}
*/
@Override @Override
public Iterator<Entry<K, V>> iterator() { public Iterator<Entry<K, V>> iterator() {
synchronized (map) { synchronized (map) {
@ -484,15 +422,18 @@ public class MapCache<K, V> implements Cache<K, V> {
K key = entry.getKey(); K key = entry.getKey();
V value = entry.getValue(); V value = entry.getValue();
@Override public K getKey() { @Override
public K getKey() {
return key; return key;
} }
@Override public V getValue() { @Override
public V getValue() {
return value; return value;
} }
@Override public <T> T unwrap(Class<T> clazz) { @Override
public <T> T unwrap(Class<T> clazz) {
return null; return null;
} }
}; };
@ -512,8 +453,5 @@ public class MapCache<K, V> implements Cache<K, V> {
// } // }
} }
private void notifyRemovedListeners(Set<? extends K> keys) { private void notifyRemovedListeners(Set<? extends K> keys) {}
}
} }

View file

@ -143,6 +143,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
Widget w1, w1a, w2, w3, w4; Widget w1, w1a, w2, w3, w4;
UUID key1 = UUIDs.timeBased(); UUID key1 = UUIDs.timeBased();
UUID key2 = UUIDs.timeBased(); UUID key2 = UUIDs.timeBased();
String cacheKey1 = MappingUtil.getTableName(Widget.class, false) + "." + key1.toString();
String cacheKey2 = MappingUtil.getTableName(Widget.class, false) + "." + key2.toString();
// This should inserted Widget, and not cache it in uow1. // This should inserted Widget, and not cache it in uow1.
try (UnitOfWork uow1 = session.begin()) { try (UnitOfWork uow1 = session.begin()) {
@ -156,6 +158,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.value(widget::c, RandomString.make(10)) .value(widget::c, RandomString.make(10))
.value(widget::d, RandomString.make(10)) .value(widget::d, RandomString.make(10))
.sync(uow1); .sync(uow1);
uow1.getCache().put(cacheKey1, w1);
Assert.assertEquals(w1, uow1.getCache().get(cacheKey1));
try (UnitOfWork uow2 = session.begin(uow1)) { try (UnitOfWork uow2 = session.begin(uow1)) {
@ -180,6 +184,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.orElse(null); .orElse(null);
Assert.assertEquals(w1, w2); Assert.assertEquals(w1, w2);
uow2.getCache().put(cacheKey2, w2);
w3 = w3 =
session session
@ -192,6 +197,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.value(widget::d, RandomString.make(10)) .value(widget::d, RandomString.make(10))
.sync(uow2); .sync(uow2);
Assert.assertEquals(w1, uow2.getCache().get(cacheKey1));
Assert.assertEquals(w2, uow2.getCache().get(cacheKey2));
uow2.commit() uow2.commit()
.andThen( .andThen(
() -> { () -> {
@ -378,6 +385,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
Widget w1, w2, w3, w4, w5, w6; Widget w1, w2, w3, w4, w5, w6;
Long committedAt = 0L; Long committedAt = 0L;
UUID key = UUIDs.timeBased(); UUID key = UUIDs.timeBased();
String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key.toString();
try (UnitOfWork uow = session.begin()) { try (UnitOfWork uow = session.begin()) {
w1 = w1 =
@ -392,6 +400,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.batch(uow); .batch(uow);
Assert.assertTrue(0L == w1.writtenAt(widget::name)); Assert.assertTrue(0L == w1.writtenAt(widget::name));
Assert.assertTrue(0 == w1.ttlOf(widget::name)); Assert.assertTrue(0 == w1.ttlOf(widget::name));
uow.getCache().put(cacheKey, w1);
w2 = w2 =
session session
.<Widget>update(w1) .<Widget>update(w1)
@ -424,6 +433,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
.value(widget::d, RandomString.make(10)) .value(widget::d, RandomString.make(10))
.batch(uow); .batch(uow);
uow.getCache().put(cacheKey, w1);
uow.commit(); uow.commit();
committedAt = uow.committedAt(); committedAt = uow.committedAt();
Date d = new Date(committedAt * 1000); Date d = new Date(committedAt * 1000);