From 93a81e7fd01f1d0e2ccef84b9ad1fe4a2fcc5617 Mon Sep 17 00:00:00 2001 From: John de Freitas Date: Sat, 31 Mar 2018 22:18:28 -0400 Subject: [PATCH] Delegate-based evict-tracking cache, fixes to core javax.cache.Cache implemenation MapCache, and unit tests --- .../java/net/helenus/core/UnitOfWork.java | 240 +++++++++++++++++- .../java/net/helenus/core/cache/MapCache.java | 52 ++-- .../core/unitofwork/UnitOfWorkTest.java | 170 ++++++++++++- 3 files changed, 425 insertions(+), 37 deletions(-) diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 9dca27d..dfcdf37 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -22,15 +22,32 @@ import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; import com.google.common.collect.TreeTraverser; import java.io.Serializable; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.cache.Cache; import javax.cache.CacheManager; +import javax.cache.configuration.CacheEntryListenerConfiguration; +import javax.cache.configuration.Configuration; import javax.cache.integration.CacheLoader; import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CompletionListener; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; + import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; import net.helenus.core.cache.MapCache; @@ -51,7 +68,7 @@ public class UnitOfWork implements AutoCloseable { public final UnitOfWork parent; private final List nested = new ArrayList<>(); private final Table>> cache = HashBasedTable.create(); - private final MapCache statementCache; + private final EvictTrackingMapCache statementCache; protected final HelenusSession session; protected String purpose; protected List nestedPurposes = new ArrayList(); @@ -106,7 +123,7 @@ public class UnitOfWork implements AutoCloseable { }; } this.elapsedTime = Stopwatch.createUnstarted(); - this.statementCache = new MapCache(null, "UOW(" + hashCode() + ")", cacheLoader, true); + this.statementCache = new EvictTrackingMapCache(null, "UOW(" + hashCode() + ")", cacheLoader, true); } public void addDatabaseTime(String name, Stopwatch amount) { @@ -567,4 +584,221 @@ public class UnitOfWork implements AutoCloseable { public long committedAt() { return committedAt; } + + private static class EvictTrackingMapCache implements Cache { + private final Set deletes; + private final Cache delegate; + + public EvictTrackingMapCache(CacheManager manager, String name, CacheLoader cacheLoader, + boolean isReadThrough) { + deletes = Collections.synchronizedSet(new HashSet<>()); + delegate = new MapCache<>(manager, name, cacheLoader, isReadThrough); + } + + /** Non-interface method; should only be called by UnitOfWork when merging to an enclosing UnitOfWork. */ + public Set getDeletions() { + return new HashSet<>(deletes); + } + + @Override + public V get(K key) { + if (deletes.contains(key)) { + return null; + } + + return delegate.get(key); + } + + @Override + public Map getAll(Set keys) { + Set clonedKeys = new HashSet<>(keys); + clonedKeys.removeAll(deletes); + return delegate.getAll(clonedKeys); + } + + @Override + public boolean containsKey(K key) { + if (deletes.contains(key)) { + return false; + } + + return delegate.containsKey(key); + } + + @Override + public void loadAll(Set keys, boolean replaceExistingValues, CompletionListener listener) { + Set clonedKeys = new HashSet<>(keys); + clonedKeys.removeAll(deletes); + delegate.loadAll(clonedKeys, replaceExistingValues, listener); + } + + @Override + public void put(K key, V value) { + if (deletes.contains(key)) { + deletes.remove(key); + } + + delegate.put(key, value); + } + + @Override + public V getAndPut(K key, V value) { + if (deletes.contains(key)) { + deletes.remove(key); + } + + return delegate.getAndPut(key, value); + } + + @Override + public void putAll(Map map) { + deletes.removeAll(map.keySet()); + delegate.putAll(map); + } + + @Override + public synchronized boolean putIfAbsent(K key, V value) { + if (!delegate.containsKey(key) && deletes.contains(key)) { + deletes.remove(key); + } + + return delegate.putIfAbsent(key, value); + } + + @Override + public boolean remove(K key) { + boolean removed = delegate.remove(key); + deletes.add(key); + return removed; + } + + @Override + public boolean remove(K key, V value) { + boolean removed = delegate.remove(key, value); + if (removed) { + deletes.add(key); + } + + return removed; + } + + @Override + public V getAndRemove(K key) { + V value = delegate.getAndRemove(key); + deletes.add(key); + return value; + } + + @Override + public void removeAll(Set keys) { + Set cloneKeys = new HashSet<>(keys); + delegate.removeAll(cloneKeys); + deletes.addAll(cloneKeys); + } + + @Override + @SuppressWarnings("unchecked") + public synchronized void removeAll() { + Map impl = delegate.unwrap(Map.class); + Set keys = impl.keySet(); + delegate.removeAll(); + deletes.addAll(keys); + } + + @Override + public void clear() { + delegate.clear(); + deletes.clear(); + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + if (deletes.contains(key)) { + return false; + } + + return delegate.replace(key, oldValue, newValue); + } + + @Override + public boolean replace(K key, V value) { + if (deletes.contains(key)) { + return false; + } + + return delegate.replace(key, value); + } + + @Override + public V getAndReplace(K key, V value) { + if (deletes.contains(key)) { + return null; + } + + return delegate.getAndReplace(key, value); + } + + @Override + public > C getConfiguration(Class clazz) { + return delegate.getConfiguration(clazz); + } + + @Override + public T invoke(K key, EntryProcessor processor, Object... arguments) + throws EntryProcessorException { + if (deletes.contains(key)) { + return null; + } + + return delegate.invoke(key, processor, arguments); + } + + @Override + public Map> invokeAll(Set keys, EntryProcessor processor, + Object... arguments) { + Set clonedKeys = new HashSet<>(keys); + clonedKeys.removeAll(deletes); + return delegate.invokeAll(clonedKeys, processor, arguments); + } + + @Override + public String getName() { + return delegate.getName(); + } + + @Override + public CacheManager getCacheManager() { + return delegate.getCacheManager(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public boolean isClosed() { + return delegate.isClosed(); + } + + @Override + public T unwrap(Class clazz) { + return delegate.unwrap(clazz); + } + + @Override + public void registerCacheEntryListener(CacheEntryListenerConfiguration cacheEntryListenerConfiguration) { + delegate.registerCacheEntryListener(cacheEntryListenerConfiguration); + } + + @Override + public void deregisterCacheEntryListener(CacheEntryListenerConfiguration cacheEntryListenerConfiguration) { + delegate.deregisterCacheEntryListener(cacheEntryListenerConfiguration); + } + + @Override + public Iterator> iterator() { + return delegate.iterator(); + } + } } diff --git a/src/main/java/net/helenus/core/cache/MapCache.java b/src/main/java/net/helenus/core/cache/MapCache.java index 7418efd..72c0326 100644 --- a/src/main/java/net/helenus/core/cache/MapCache.java +++ b/src/main/java/net/helenus/core/cache/MapCache.java @@ -1,6 +1,5 @@ package net.helenus.core.cache; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -23,15 +22,14 @@ public class MapCache implements Cache { private final CacheManager manager; private final String name; private Map map = new ConcurrentHashMap<>(); - private Set deletes = Collections.synchronizedSet(new HashSet<>()); private Set> cacheEntryRemovedListeners = new HashSet<>(); private CacheLoader cacheLoader = null; private boolean isReadThrough = false; private static class MapConfiguration implements Configuration { - private static final long serialVersionUID = 6093947542772516209L; + private static final long serialVersionUID = 6093947542772516209L; - @Override + @Override public Class getKeyType() { return null; } @@ -55,10 +53,6 @@ public class MapCache implements Cache { this.isReadThrough = isReadThrough; } - /** Non-interface method; should only be called by UnitOfWork when merging to an enclosing UnitOfWork. */ - public Set getDeletions() { - return new HashSet<>(deletes); - } /** {@inheritDoc} */ @Override @@ -83,23 +77,23 @@ public class MapCache implements Cache { Map result = null; synchronized (map) { result = new HashMap(keys.size()); - for (K key : keys) { + Iterator it = keys.iterator(); + while (it.hasNext()) { + K key = it.next(); V value = map.get(key); if (value != null) { result.put(key, value); - keys.remove(key); + it.remove(); } } - if (isReadThrough && cacheLoader != null) { - for (K key : keys) { - Map loadedValues = cacheLoader.loadAll(keys); - for (Map.Entry entry : loadedValues.entrySet()) { - V v = entry.getValue(); - if (v != null) { - K k = entry.getKey(); - map.put(k, v); - result.put(k, v); - } + if (keys.size() != 0 && isReadThrough && cacheLoader != null) { + Map loadedValues = cacheLoader.loadAll(keys); + for (Map.Entry entry : loadedValues.entrySet()) { + V v = entry.getValue(); + if (v != null) { + K k = entry.getKey(); + map.put(k, v); + result.put(k, v); } } } @@ -158,11 +152,10 @@ public class MapCache implements Cache { V result = null; synchronized (map) { result = map.get(key); - if (value == null && isReadThrough && cacheLoader != null) { + if (result == null && isReadThrough && cacheLoader != null) { V loadedValue = cacheLoader.load(key); if (loadedValue != null) { - map.put(key, value); - value = loadedValue; + result = loadedValue; } } map.put(key, value); @@ -199,7 +192,6 @@ public class MapCache implements Cache { boolean removed = false; synchronized (map) { removed = map.remove(key) != null; - deletes.add(key); notifyRemovedListeners(key); } return removed; @@ -212,7 +204,6 @@ public class MapCache implements Cache { V value = map.get(key); if (value != null && oldValue.equals(value)) { map.remove(key); - deletes.add(key); notifyRemovedListeners(key); return true; } @@ -227,7 +218,6 @@ public class MapCache implements Cache { V oldValue = null; oldValue = map.get(key); map.remove(key); - deletes.add(key); notifyRemovedListeners(key); return oldValue; } @@ -275,14 +265,15 @@ public class MapCache implements Cache { @Override public void removeAll(Set keys) { synchronized (map) { - for (K key : keys) { + Iterator it = keys.iterator(); + while (it.hasNext()) { + K key = it.next(); if (map.containsKey(key)) { map.remove(key); } else { - keys.remove(key); + it.remove(); } } - deletes.addAll(keys); } notifyRemovedListeners(keys); } @@ -293,7 +284,6 @@ public class MapCache implements Cache { synchronized (map) { Set keys = map.keySet(); map.clear(); - deletes.addAll(keys); notifyRemovedListeners(keys); } } @@ -302,7 +292,6 @@ public class MapCache implements Cache { @Override public void clear() { map.clear(); - deletes.clear(); } /** {@inheritDoc} */ @@ -318,6 +307,7 @@ public class MapCache implements Cache { @Override public T invoke(K key, EntryProcessor entryProcessor, Object... arguments) throws EntryProcessorException { + // TODO return null; } diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java index b08a3ae..e34e49f 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java @@ -17,12 +17,12 @@ package net.helenus.test.integration.core.unitofwork; import static net.helenus.core.Query.eq; -import ca.exprofesso.guava.jcache.GuavaCachingProvider; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.utils.UUIDs; + import java.io.Serializable; import java.util.Date; +import java.util.Map; import java.util.UUID; +import java.util.Set; import javax.cache.CacheManager; import javax.cache.Caching; import javax.cache.configuration.MutableConfiguration; @@ -42,6 +42,11 @@ import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import ca.exprofesso.guava.jcache.GuavaCachingProvider; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.utils.UUIDs; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; @Table @Cacheable @@ -550,4 +555,163 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { Assert.assertTrue(w1.equals(w4)); Assert.assertTrue(w4.equals(w1)); } + + @Test + public void getAllLoadAllTest() throws Exception { + String tableName = MappingUtil.getTableName(Widget.class, false).toString(); + UUID uuid1 = UUIDs.random(); + UUID uuid2 = UUIDs.random(); + UUID uuid3 = UUIDs.random(); + String k1 = tableName + "." + uuid1.toString(); + String k2 = tableName + "." + uuid2.toString(); + String k3 = tableName + "." + uuid3.toString(); + Set allKeys = ImmutableSet.of(k1, k2, k3); + + try (UnitOfWork uow1 = session.begin()) { + Widget w1 = session.insert(widget).value(widget::id, uuid1).sync(uow1); + Widget w2 = session.insert(widget).value(widget::id, uuid2).sync(uow1); + uow1.getCache().put(k1, w1); + uow1.getCache().put(k2, w2); + + Map results = uow1.getCache().getAll(allKeys); + Assert.assertEquals(2, results.entrySet().size()); + Assert.assertEquals(results, ImmutableMap.of(k1, w1, k2, w2)); + + // getAll tests + try (UnitOfWork uow2 = session.begin(uow1)) { + results = uow2.getCache().getAll(allKeys); + Assert.assertEquals(2, results.entrySet().size()); + Assert.assertEquals(results, ImmutableMap.of(k1, w1, k2, w2)); + + Widget w3 = session.insert(widget).value(widget::id, uuid3).sync(uow2); + uow2.getCache().put(k3, w3); + results = uow2.getCache().getAll(allKeys); + Assert.assertEquals(3, results.entrySet().size()); + Assert.assertEquals(results, ImmutableMap.of(k1, w1, k2, w2, k3, w3)); + + boolean removed = uow2.getCache().remove(k2); + Assert.assertTrue(removed); + removed = uow2.getCache().remove(k2); + Assert.assertFalse(removed); + results = uow2.getCache().getAll(allKeys); + Assert.assertEquals(2, results.size()); + Assert.assertEquals(results, ImmutableMap.of(k1, w1, k3, w3)); + + // Propagate changes to parent UOW for below tests. + uow2.commit(); + } + + // loadAll tests + try (UnitOfWork uow3 = session.begin(uow1)) { + uow3.getCache().loadAll(allKeys, false, null); + Assert.assertTrue(uow3.getCache().containsKey(k1)); + Assert.assertTrue(uow3.getCache().containsKey(k3)); + Assert.assertFalse(uow3.getCache().containsKey(k2)); + Assert.assertEquals(w1, uow3.getCache().get(k1)); + } + + try (UnitOfWork uow4 = session.begin(uow1)) { + UUID uuid3Updated = UUIDs.random(); + Widget w3Updated = session.insert(widget).value(widget::id, uuid3Updated).sync(uow4); + + // Insert a value for a known key, and load the cache without replacing existing values + uow4.getCache().put(k3, w3Updated); + Assert.assertEquals(w3Updated, uow4.getCache().get(k3)); + uow4.getCache().loadAll(allKeys, false, null); + Assert.assertEquals(w3Updated, uow4.getCache().get(k3)); + + // Insert a value for a known key, and load the cache by replacing existing values + UnitOfWork uow5 = session.begin(uow1); + uow5.getCache().put(k3, w3Updated); + Assert.assertEquals(w3Updated, uow5.getCache().get(k3)); + uow5.getCache().loadAll(allKeys, true, null); + Assert.assertNotNull(uow5.getCache().get(k3)); + Assert.assertNotEquals(w3Updated, uow5.getCache().get(k3)); + } + } + } + + @Test + public void getAndPutTest() throws Exception { + String tableName = MappingUtil.getTableName(Widget.class, false).toString(); + UUID uuid1 = UUIDs.random(); + UUID uuid2 = UUIDs.random(); + String k1 = tableName + "." + uuid1.toString(); + + try (UnitOfWork uow1 = session.begin()) { + Widget w1 = session.insert(widget).value(widget::id, uuid1).sync(uow1); + uow1.getCache().put(k1, w1); + try (UnitOfWork uow2 = session.begin(uow1)) { + Widget w2 = session.insert(widget).value(widget::id, uuid2).sync(uow2); + Widget value = (Widget) uow2.getCache().getAndPut(k1, w2); + Assert.assertEquals(w1, value); + value = (Widget) uow2.getCache().get(k1); + Assert.assertEquals(w2, value); + } + } + } + + @Test + public void removeAllTest() throws Exception { + String tableName = MappingUtil.getTableName(Widget.class, false).toString(); + UUID uuid1 = UUIDs.random(); + UUID uuid2 = UUIDs.random(); + String k1 = tableName + "." + uuid1.toString(); + String k2 = tableName + "." + uuid2.toString(); + Set keys = ImmutableSet.of(k1, k2, "noValue"); + + try (UnitOfWork uow = session.begin()) { + Widget w1 = session.insert(widget).value(widget::id, uuid1).sync(uow); + Widget w2 = session.insert(widget).value(widget::id, uuid2).sync(uow); + uow.getCache().put(k1, w1); + uow.getCache().put(k2, w2); + uow.getCache().removeAll(keys); + } + } + + @Test + public void testDeleteInNestedUOW() throws Exception { + String tableName = MappingUtil.getTableName(Widget.class, false).toString(); + UUID uuid1 = UUIDs.random(); + UUID uuid2 = UUIDs.random(); + String k1 = tableName + "." + uuid1.toString(); + String k2 = tableName + "." + uuid2.toString(); + + try (UnitOfWork uow1 = session.begin()) { + Widget w1 = session.insert(widget).value(widget::id, uuid1) + .value(widget::name, RandomString.make(10)) + .sync(uow1); + Widget w2 = session.insert(widget).value(widget::id, uuid2) + .value(widget::name, RandomString.make(20)) + .sync(uow1); + uow1.getCache().put(k1, w1); + uow1.getCache().put(k2, w2); + + try (UnitOfWork uow2 = session.begin(uow1)) { + Object o1 = uow2.getCache().get(k1); + Object o2 = uow2.getCache().get(k2); + Assert.assertEquals(w1, o1); + Assert.assertEquals(w2, o2); + + // k1 should not be available in uow2, but available in uow1. + uow2.getCache().remove(k1); + Assert.assertNull(uow2.getCache().get(k1)); + Assert.assertNotNull(uow1.getCache().get(k1)); + + // Post-commit, k1 shouldn't be availble in uow1 either + uow2.commit(); + Assert.assertNull(uow2.getCache().get(k1)); + Assert.assertNull(uow1.getCache().get(k1)); + + try (UnitOfWork uow3 = session.begin(uow2)) { + uow3.getCache().get(k1); + uow3.getCache().get(k2); + uow3.getCache().remove(k2); + } + } + + uow1.getCache().get(k1); + uow1.getCache().get(k2); + } + } }