Delegate-based evict-tracking cache, fixes to core javax.cache.Cache implemenation MapCache, and unit tests

This commit is contained in:
John de Freitas 2018-03-31 22:18:28 -04:00
parent b27bc7d9a9
commit 93a81e7fd0
3 changed files with 425 additions and 37 deletions

View file

@ -22,15 +22,32 @@ import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table; import com.google.common.collect.Table;
import com.google.common.collect.TreeTraverser; import com.google.common.collect.TreeTraverser;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.cache.Cache; import javax.cache.Cache;
import javax.cache.CacheManager; import javax.cache.CacheManager;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Configuration;
import javax.cache.integration.CacheLoader; import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CompletionListener;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet; import net.helenus.core.cache.Facet;
import net.helenus.core.cache.MapCache; import net.helenus.core.cache.MapCache;
@ -51,7 +68,7 @@ public class UnitOfWork implements AutoCloseable {
public final UnitOfWork parent; public final UnitOfWork parent;
private final List<UnitOfWork> nested = new ArrayList<>(); private final List<UnitOfWork> nested = new ArrayList<>();
private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create(); private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create();
private final MapCache<String, Object> statementCache; private final EvictTrackingMapCache<String, Object> statementCache;
protected final HelenusSession session; protected final HelenusSession session;
protected String purpose; protected String purpose;
protected List<String> nestedPurposes = new ArrayList<String>(); protected List<String> nestedPurposes = new ArrayList<String>();
@ -106,7 +123,7 @@ public class UnitOfWork implements AutoCloseable {
}; };
} }
this.elapsedTime = Stopwatch.createUnstarted(); this.elapsedTime = Stopwatch.createUnstarted();
this.statementCache = new MapCache<String, Object>(null, "UOW(" + hashCode() + ")", cacheLoader, true); this.statementCache = new EvictTrackingMapCache<String, Object>(null, "UOW(" + hashCode() + ")", cacheLoader, true);
} }
public void addDatabaseTime(String name, Stopwatch amount) { public void addDatabaseTime(String name, Stopwatch amount) {
@ -567,4 +584,221 @@ public class UnitOfWork implements AutoCloseable {
public long committedAt() { public long committedAt() {
return committedAt; return committedAt;
} }
private static class EvictTrackingMapCache<K, V> implements Cache<K, V> {
private final Set<K> deletes;
private final Cache<K, V> delegate;
public EvictTrackingMapCache(CacheManager manager, String name, CacheLoader<K, V> cacheLoader,
boolean isReadThrough) {
deletes = Collections.synchronizedSet(new HashSet<>());
delegate = new MapCache<>(manager, name, cacheLoader, isReadThrough);
}
/** Non-interface method; should only be called by UnitOfWork when merging to an enclosing UnitOfWork. */
public Set<K> getDeletions() {
return new HashSet<>(deletes);
}
@Override
public V get(K key) {
if (deletes.contains(key)) {
return null;
}
return delegate.get(key);
}
@Override
public Map<K, V> getAll(Set<? extends K> keys) {
Set<? extends K> clonedKeys = new HashSet<>(keys);
clonedKeys.removeAll(deletes);
return delegate.getAll(clonedKeys);
}
@Override
public boolean containsKey(K key) {
if (deletes.contains(key)) {
return false;
}
return delegate.containsKey(key);
}
@Override
public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener listener) {
Set<? extends K> clonedKeys = new HashSet<>(keys);
clonedKeys.removeAll(deletes);
delegate.loadAll(clonedKeys, replaceExistingValues, listener);
}
@Override
public void put(K key, V value) {
if (deletes.contains(key)) {
deletes.remove(key);
}
delegate.put(key, value);
}
@Override
public V getAndPut(K key, V value) {
if (deletes.contains(key)) {
deletes.remove(key);
}
return delegate.getAndPut(key, value);
}
@Override
public void putAll(Map<? extends K, ? extends V> map) {
deletes.removeAll(map.keySet());
delegate.putAll(map);
}
@Override
public synchronized boolean putIfAbsent(K key, V value) {
if (!delegate.containsKey(key) && deletes.contains(key)) {
deletes.remove(key);
}
return delegate.putIfAbsent(key, value);
}
@Override
public boolean remove(K key) {
boolean removed = delegate.remove(key);
deletes.add(key);
return removed;
}
@Override
public boolean remove(K key, V value) {
boolean removed = delegate.remove(key, value);
if (removed) {
deletes.add(key);
}
return removed;
}
@Override
public V getAndRemove(K key) {
V value = delegate.getAndRemove(key);
deletes.add(key);
return value;
}
@Override
public void removeAll(Set<? extends K> keys) {
Set<? extends K> cloneKeys = new HashSet<>(keys);
delegate.removeAll(cloneKeys);
deletes.addAll(cloneKeys);
}
@Override
@SuppressWarnings("unchecked")
public synchronized void removeAll() {
Map<K, V> impl = delegate.unwrap(Map.class);
Set<K> keys = impl.keySet();
delegate.removeAll();
deletes.addAll(keys);
}
@Override
public void clear() {
delegate.clear();
deletes.clear();
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
if (deletes.contains(key)) {
return false;
}
return delegate.replace(key, oldValue, newValue);
}
@Override
public boolean replace(K key, V value) {
if (deletes.contains(key)) {
return false;
}
return delegate.replace(key, value);
}
@Override
public V getAndReplace(K key, V value) {
if (deletes.contains(key)) {
return null;
}
return delegate.getAndReplace(key, value);
}
@Override
public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
return delegate.getConfiguration(clazz);
}
@Override
public <T> T invoke(K key, EntryProcessor<K, V, T> processor, Object... arguments)
throws EntryProcessorException {
if (deletes.contains(key)) {
return null;
}
return delegate.invoke(key, processor, arguments);
}
@Override
public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> processor,
Object... arguments) {
Set<? extends K> clonedKeys = new HashSet<>(keys);
clonedKeys.removeAll(deletes);
return delegate.invokeAll(clonedKeys, processor, arguments);
}
@Override
public String getName() {
return delegate.getName();
}
@Override
public CacheManager getCacheManager() {
return delegate.getCacheManager();
}
@Override
public void close() {
delegate.close();
}
@Override
public boolean isClosed() {
return delegate.isClosed();
}
@Override
public <T> T unwrap(Class<T> clazz) {
return delegate.unwrap(clazz);
}
@Override
public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
delegate.registerCacheEntryListener(cacheEntryListenerConfiguration);
}
@Override
public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
delegate.deregisterCacheEntryListener(cacheEntryListenerConfiguration);
}
@Override
public Iterator<Entry<K, V>> iterator() {
return delegate.iterator();
}
}
} }

View file

@ -1,6 +1,5 @@
package net.helenus.core.cache; package net.helenus.core.cache;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -23,15 +22,14 @@ public class MapCache<K, V> implements Cache<K, V> {
private final CacheManager manager; private final CacheManager manager;
private final String name; private final String name;
private Map<K, V> map = new ConcurrentHashMap<>(); private Map<K, V> map = new ConcurrentHashMap<>();
private Set<K> deletes = Collections.synchronizedSet(new HashSet<>());
private Set<CacheEntryRemovedListener<K, V>> cacheEntryRemovedListeners = new HashSet<>(); private Set<CacheEntryRemovedListener<K, V>> cacheEntryRemovedListeners = new HashSet<>();
private CacheLoader<K, V> cacheLoader = null; private CacheLoader<K, V> cacheLoader = null;
private boolean isReadThrough = false; private boolean isReadThrough = false;
private static class MapConfiguration<K, V> implements Configuration<K, V> { private static class MapConfiguration<K, V> implements Configuration<K, V> {
private static final long serialVersionUID = 6093947542772516209L; private static final long serialVersionUID = 6093947542772516209L;
@Override @Override
public Class<K> getKeyType() { public Class<K> getKeyType() {
return null; return null;
} }
@ -55,10 +53,6 @@ public class MapCache<K, V> implements Cache<K, V> {
this.isReadThrough = isReadThrough; this.isReadThrough = isReadThrough;
} }
/** Non-interface method; should only be called by UnitOfWork when merging to an enclosing UnitOfWork. */
public Set<K> getDeletions() {
return new HashSet<>(deletes);
}
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
@ -83,23 +77,23 @@ public class MapCache<K, V> implements Cache<K, V> {
Map<K, V> result = null; Map<K, V> result = null;
synchronized (map) { synchronized (map) {
result = new HashMap<K, V>(keys.size()); result = new HashMap<K, V>(keys.size());
for (K key : keys) { Iterator<? extends K> it = keys.iterator();
while (it.hasNext()) {
K key = it.next();
V value = map.get(key); V value = map.get(key);
if (value != null) { if (value != null) {
result.put(key, value); result.put(key, value);
keys.remove(key); it.remove();
} }
} }
if (isReadThrough && cacheLoader != null) { if (keys.size() != 0 && isReadThrough && cacheLoader != null) {
for (K key : keys) { Map<K, V> loadedValues = cacheLoader.loadAll(keys);
Map<K, V> loadedValues = cacheLoader.loadAll(keys); for (Map.Entry<K, V> entry : loadedValues.entrySet()) {
for (Map.Entry<K, V> entry : loadedValues.entrySet()) { V v = entry.getValue();
V v = entry.getValue(); if (v != null) {
if (v != null) { K k = entry.getKey();
K k = entry.getKey(); map.put(k, v);
map.put(k, v); result.put(k, v);
result.put(k, v);
}
} }
} }
} }
@ -158,11 +152,10 @@ public class MapCache<K, V> implements Cache<K, V> {
V result = null; V result = null;
synchronized (map) { synchronized (map) {
result = map.get(key); result = map.get(key);
if (value == null && isReadThrough && cacheLoader != null) { if (result == null && isReadThrough && cacheLoader != null) {
V loadedValue = cacheLoader.load(key); V loadedValue = cacheLoader.load(key);
if (loadedValue != null) { if (loadedValue != null) {
map.put(key, value); result = loadedValue;
value = loadedValue;
} }
} }
map.put(key, value); map.put(key, value);
@ -199,7 +192,6 @@ public class MapCache<K, V> implements Cache<K, V> {
boolean removed = false; boolean removed = false;
synchronized (map) { synchronized (map) {
removed = map.remove(key) != null; removed = map.remove(key) != null;
deletes.add(key);
notifyRemovedListeners(key); notifyRemovedListeners(key);
} }
return removed; return removed;
@ -212,7 +204,6 @@ public class MapCache<K, V> implements Cache<K, V> {
V value = map.get(key); V value = map.get(key);
if (value != null && oldValue.equals(value)) { if (value != null && oldValue.equals(value)) {
map.remove(key); map.remove(key);
deletes.add(key);
notifyRemovedListeners(key); notifyRemovedListeners(key);
return true; return true;
} }
@ -227,7 +218,6 @@ public class MapCache<K, V> implements Cache<K, V> {
V oldValue = null; V oldValue = null;
oldValue = map.get(key); oldValue = map.get(key);
map.remove(key); map.remove(key);
deletes.add(key);
notifyRemovedListeners(key); notifyRemovedListeners(key);
return oldValue; return oldValue;
} }
@ -275,14 +265,15 @@ public class MapCache<K, V> implements Cache<K, V> {
@Override @Override
public void removeAll(Set<? extends K> keys) { public void removeAll(Set<? extends K> keys) {
synchronized (map) { synchronized (map) {
for (K key : keys) { Iterator<? extends K> it = keys.iterator();
while (it.hasNext()) {
K key = it.next();
if (map.containsKey(key)) { if (map.containsKey(key)) {
map.remove(key); map.remove(key);
} else { } else {
keys.remove(key); it.remove();
} }
} }
deletes.addAll(keys);
} }
notifyRemovedListeners(keys); notifyRemovedListeners(keys);
} }
@ -293,7 +284,6 @@ public class MapCache<K, V> implements Cache<K, V> {
synchronized (map) { synchronized (map) {
Set<K> keys = map.keySet(); Set<K> keys = map.keySet();
map.clear(); map.clear();
deletes.addAll(keys);
notifyRemovedListeners(keys); notifyRemovedListeners(keys);
} }
} }
@ -302,7 +292,6 @@ public class MapCache<K, V> implements Cache<K, V> {
@Override @Override
public void clear() { public void clear() {
map.clear(); map.clear();
deletes.clear();
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
@ -318,6 +307,7 @@ public class MapCache<K, V> implements Cache<K, V> {
@Override @Override
public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments) public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments)
throws EntryProcessorException { throws EntryProcessorException {
// TODO
return null; return null;
} }

View file

@ -17,12 +17,12 @@ package net.helenus.test.integration.core.unitofwork;
import static net.helenus.core.Query.eq; import static net.helenus.core.Query.eq;
import ca.exprofesso.guava.jcache.GuavaCachingProvider;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.utils.UUIDs;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.Set;
import javax.cache.CacheManager; import javax.cache.CacheManager;
import javax.cache.Caching; import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration; import javax.cache.configuration.MutableConfiguration;
@ -42,6 +42,11 @@ import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import ca.exprofesso.guava.jcache.GuavaCachingProvider;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@Table @Table
@Cacheable @Cacheable
@ -550,4 +555,163 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
Assert.assertTrue(w1.equals(w4)); Assert.assertTrue(w1.equals(w4));
Assert.assertTrue(w4.equals(w1)); Assert.assertTrue(w4.equals(w1));
} }
@Test
public void getAllLoadAllTest() throws Exception {
String tableName = MappingUtil.getTableName(Widget.class, false).toString();
UUID uuid1 = UUIDs.random();
UUID uuid2 = UUIDs.random();
UUID uuid3 = UUIDs.random();
String k1 = tableName + "." + uuid1.toString();
String k2 = tableName + "." + uuid2.toString();
String k3 = tableName + "." + uuid3.toString();
Set<String> allKeys = ImmutableSet.<String>of(k1, k2, k3);
try (UnitOfWork uow1 = session.begin()) {
Widget w1 = session.<Widget>insert(widget).value(widget::id, uuid1).sync(uow1);
Widget w2 = session.<Widget>insert(widget).value(widget::id, uuid2).sync(uow1);
uow1.getCache().put(k1, w1);
uow1.getCache().put(k2, w2);
Map<String, Object> results = uow1.getCache().getAll(allKeys);
Assert.assertEquals(2, results.entrySet().size());
Assert.assertEquals(results, ImmutableMap.of(k1, w1, k2, w2));
// getAll tests
try (UnitOfWork uow2 = session.begin(uow1)) {
results = uow2.getCache().getAll(allKeys);
Assert.assertEquals(2, results.entrySet().size());
Assert.assertEquals(results, ImmutableMap.of(k1, w1, k2, w2));
Widget w3 = session.<Widget>insert(widget).value(widget::id, uuid3).sync(uow2);
uow2.getCache().put(k3, w3);
results = uow2.getCache().getAll(allKeys);
Assert.assertEquals(3, results.entrySet().size());
Assert.assertEquals(results, ImmutableMap.of(k1, w1, k2, w2, k3, w3));
boolean removed = uow2.getCache().remove(k2);
Assert.assertTrue(removed);
removed = uow2.getCache().remove(k2);
Assert.assertFalse(removed);
results = uow2.getCache().getAll(allKeys);
Assert.assertEquals(2, results.size());
Assert.assertEquals(results, ImmutableMap.of(k1, w1, k3, w3));
// Propagate changes to parent UOW for below tests.
uow2.commit();
}
// loadAll tests
try (UnitOfWork uow3 = session.begin(uow1)) {
uow3.getCache().loadAll(allKeys, false, null);
Assert.assertTrue(uow3.getCache().containsKey(k1));
Assert.assertTrue(uow3.getCache().containsKey(k3));
Assert.assertFalse(uow3.getCache().containsKey(k2));
Assert.assertEquals(w1, uow3.getCache().get(k1));
}
try (UnitOfWork uow4 = session.begin(uow1)) {
UUID uuid3Updated = UUIDs.random();
Widget w3Updated = session.<Widget>insert(widget).value(widget::id, uuid3Updated).sync(uow4);
// Insert a value for a known key, and load the cache without replacing existing values
uow4.getCache().put(k3, w3Updated);
Assert.assertEquals(w3Updated, uow4.getCache().get(k3));
uow4.getCache().loadAll(allKeys, false, null);
Assert.assertEquals(w3Updated, uow4.getCache().get(k3));
// Insert a value for a known key, and load the cache by replacing existing values
UnitOfWork uow5 = session.begin(uow1);
uow5.getCache().put(k3, w3Updated);
Assert.assertEquals(w3Updated, uow5.getCache().get(k3));
uow5.getCache().loadAll(allKeys, true, null);
Assert.assertNotNull(uow5.getCache().get(k3));
Assert.assertNotEquals(w3Updated, uow5.getCache().get(k3));
}
}
}
@Test
public void getAndPutTest() throws Exception {
String tableName = MappingUtil.getTableName(Widget.class, false).toString();
UUID uuid1 = UUIDs.random();
UUID uuid2 = UUIDs.random();
String k1 = tableName + "." + uuid1.toString();
try (UnitOfWork uow1 = session.begin()) {
Widget w1 = session.<Widget>insert(widget).value(widget::id, uuid1).sync(uow1);
uow1.getCache().put(k1, w1);
try (UnitOfWork uow2 = session.begin(uow1)) {
Widget w2 = session.<Widget>insert(widget).value(widget::id, uuid2).sync(uow2);
Widget value = (Widget) uow2.getCache().getAndPut(k1, w2);
Assert.assertEquals(w1, value);
value = (Widget) uow2.getCache().get(k1);
Assert.assertEquals(w2, value);
}
}
}
@Test
public void removeAllTest() throws Exception {
String tableName = MappingUtil.getTableName(Widget.class, false).toString();
UUID uuid1 = UUIDs.random();
UUID uuid2 = UUIDs.random();
String k1 = tableName + "." + uuid1.toString();
String k2 = tableName + "." + uuid2.toString();
Set<String> keys = ImmutableSet.of(k1, k2, "noValue");
try (UnitOfWork uow = session.begin()) {
Widget w1 = session.<Widget>insert(widget).value(widget::id, uuid1).sync(uow);
Widget w2 = session.<Widget>insert(widget).value(widget::id, uuid2).sync(uow);
uow.getCache().put(k1, w1);
uow.getCache().put(k2, w2);
uow.getCache().removeAll(keys);
}
}
@Test
public void testDeleteInNestedUOW() throws Exception {
String tableName = MappingUtil.getTableName(Widget.class, false).toString();
UUID uuid1 = UUIDs.random();
UUID uuid2 = UUIDs.random();
String k1 = tableName + "." + uuid1.toString();
String k2 = tableName + "." + uuid2.toString();
try (UnitOfWork uow1 = session.begin()) {
Widget w1 = session.<Widget>insert(widget).value(widget::id, uuid1)
.value(widget::name, RandomString.make(10))
.sync(uow1);
Widget w2 = session.<Widget>insert(widget).value(widget::id, uuid2)
.value(widget::name, RandomString.make(20))
.sync(uow1);
uow1.getCache().put(k1, w1);
uow1.getCache().put(k2, w2);
try (UnitOfWork uow2 = session.begin(uow1)) {
Object o1 = uow2.getCache().get(k1);
Object o2 = uow2.getCache().get(k2);
Assert.assertEquals(w1, o1);
Assert.assertEquals(w2, o2);
// k1 should not be available in uow2, but available in uow1.
uow2.getCache().remove(k1);
Assert.assertNull(uow2.getCache().get(k1));
Assert.assertNotNull(uow1.getCache().get(k1));
// Post-commit, k1 shouldn't be availble in uow1 either
uow2.commit();
Assert.assertNull(uow2.getCache().get(k1));
Assert.assertNull(uow1.getCache().get(k1));
try (UnitOfWork uow3 = session.begin(uow2)) {
uow3.getCache().get(k1);
uow3.getCache().get(k2);
uow3.getCache().remove(k2);
}
}
uow1.getCache().get(k1);
uow1.getCache().get(k2);
}
}
} }