diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java index 38926ce..614a5d5 100644 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -24,12 +24,13 @@ import com.google.common.collect.TreeTraverser; import java.io.Serializable; import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.cache.Cache; import javax.cache.CacheManager; +import javax.cache.integration.CacheLoader; +import javax.cache.integration.CacheLoaderException; import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; import net.helenus.core.cache.MapCache; @@ -53,7 +54,7 @@ public abstract class AbstractUnitOfWork private final HelenusSession session; public final AbstractUnitOfWork parent; private final Table>> cache = HashBasedTable.create(); - private final MapCache statementCache = new MapCache(null, "UOW(" + hashCode() + ")", this); + private final MapCache statementCache; protected String purpose; protected List nestedPurposes = new ArrayList(); protected String info; @@ -76,6 +77,31 @@ public abstract class AbstractUnitOfWork this.session = session; this.parent = parent; + CacheLoader cacheLoader = null; + if (parent != null) { + cacheLoader = + new CacheLoader() { + + Cache cache = parent.getCache(); + + @Override + public Object load(String key) throws CacheLoaderException { + return cache.get(key); + } + + @Override + public Map loadAll(Iterable keys) + throws CacheLoaderException { + Map kvp = new HashMap(); + for (String key : keys) { + kvp.put(key, cache.get(key)); + } + return kvp; + } + }; + } + this.statementCache = + new MapCache(null, "UOW(" + hashCode() + ")", cacheLoader, true); } @Override @@ -284,7 +310,7 @@ public abstract class AbstractUnitOfWork @Override public Cache getCache() { - return statementCache; + return statementCache; } @Override @@ -354,10 +380,10 @@ public abstract class AbstractUnitOfWork applyPostCommitFunctions("aborted", abortThunks); }); - elapsedTime.stop(); - if (LOG.isInfoEnabled()) { - LOG.info(logTimers("aborted")); - } + elapsedTime.stop(); + if (LOG.isInfoEnabled()) { + LOG.info(logTimers("aborted")); + } } return new PostCommitFunction(this, null, null, false); @@ -378,12 +404,13 @@ public abstract class AbstractUnitOfWork // Merge our statement cache into the session cache if it exists. CacheManager cacheManager = session.getCacheManager(); if (cacheManager != null) { - for (Map.Entry entry : (Set>)statementCache.unwrap(Map.class).entrySet()) { + for (Map.Entry entry : + (Set>) statementCache.unwrap(Map.class).entrySet()) { String[] keyParts = entry.getKey().split("\\."); if (keyParts.length == 2) { String cacheName = keyParts[0]; String key = keyParts[1]; - if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) { + if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) { Cache cache = cacheManager.getCache(cacheName); if (cache != null) { Object value = entry.getValue(); @@ -477,10 +504,10 @@ public abstract class AbstractUnitOfWork }); if (parent == null) { - elapsedTime.stop(); - if (LOG.isInfoEnabled()) { - LOG.info(logTimers("aborted")); - } + elapsedTime.stop(); + if (LOG.isInfoEnabled()) { + LOG.info(logTimers("aborted")); + } } // TODO(gburd): when we integrate the transaction support we'll need to... diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 7d98e90..5cc1ce6 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -20,9 +20,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; - import javax.cache.Cache; - import net.helenus.core.cache.Facet; import net.helenus.core.operation.AbstractOperation; diff --git a/src/main/java/net/helenus/core/cache/MapCache.java b/src/main/java/net/helenus/core/cache/MapCache.java index 3c42b9a..2cef0b6 100644 --- a/src/main/java/net/helenus/core/cache/MapCache.java +++ b/src/main/java/net/helenus/core/cache/MapCache.java @@ -1,6 +1,5 @@ package net.helenus.core.cache; -import static net.helenus.core.HelenusSession.deleted; import java.util.HashMap; import java.util.HashSet; @@ -8,7 +7,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; - import javax.cache.Cache; import javax.cache.CacheManager; import javax.cache.configuration.CacheEntryListenerConfiguration; @@ -21,13 +19,9 @@ import javax.cache.processor.EntryProcessorException; import javax.cache.processor.EntryProcessorResult; import javax.cache.processor.MutableEntry; -import net.helenus.core.AbstractUnitOfWork; -import net.helenus.core.UnitOfWork; - public class MapCache implements Cache { private final CacheManager manager; private final String name; - private final UnitOfWork uow; private Map map = new ConcurrentHashMap(); private Set cacheEntryRemovedListeners = new HashSet<>(); private CacheLoader cacheLoader = null; @@ -36,484 +30,428 @@ public class MapCache implements Cache { private static class MapConfiguration implements Configuration { - @Override public Class getKeyType() { - return null; - } - - @Override public Class getValueType() { - return null; - } - - @Override public boolean isStoreByValue() { - return false; - } - } - - public MapCache(CacheManager manager, String name, UnitOfWork uow) { - this.manager = manager; - this.name = name; - this.uow = uow; - } - - private V map_get(K key) { - V value = null; - AbstractUnitOfWork uow = (AbstractUnitOfWork)this.uow; - do { - V result = (V) uow.getCache().get(key); - if (result != null) { - return result == deleted ? null : result; - } - uow = uow.parent; - } while (uow != null); + @Override + public Class getKeyType() { return null; - } - - /** - * {@inheritDoc} - */ - @Override - public V get(K key) { - V value = null; - synchronized (map) { - value = map_get(key); - if (value == null && isReadThrough && cacheLoader != null) { - V loadedValue = cacheLoader.load(key); - if (loadedValue != null) { - map.put(key, value); - value = loadedValue; - } - } - } - return value; - } - - /** - * {@inheritDoc} - */ - @Override - public Map getAll(Set keys) { - Map result = null; - synchronized (map) { - result = new HashMap(keys.size()); - for (K key : keys) { - V value = map_get(key); - if (value != null) { - result.put(key, value); - keys.remove(key); - } - } - if (isReadThrough && cacheLoader != null) { - for (K key : keys) { - Map loadedValues = cacheLoader.loadAll(keys); - for (Map.Entry entry : loadedValues.entrySet()) { - V v = entry.getValue(); - if (v != null) { - K k = entry.getKey(); - map.put(k, v); - result.put(k, v); - } - } - } - } - } - return result; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean containsKey(K key) { - return map.containsKey(key); - } - - /** - * {@inheritDoc} - */ - @Override - public void loadAll(Set keys, boolean replaceExistingValues, CompletionListener completionListener) { - if (cacheLoader != null) { - try { - synchronized (map) { - Map loadedValues = cacheLoader.loadAll(keys); - for (Map.Entry entry : loadedValues.entrySet()) { - V value = entry.getValue(); - K key = entry.getKey(); - if (value != null) { - boolean existsCurrently = map.containsKey(key); - if (!existsCurrently || replaceExistingValues) { - map.put(key, value); - keys.remove(key); - } - } - } - } - } catch (Exception e) { - if (completionListener != null) { - completionListener.onException(e); - } - } - } - if (completionListener != null) { - if (keys.isEmpty()) { - completionListener.onCompletion(); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void put(K key, V value) { - map.put(key, value); - } - - /** - * {@inheritDoc} - */ - @Override - public V getAndPut(K key, V value) { - V result = null; - synchronized (map) { - result = map_get(key); - if (value == null && isReadThrough && cacheLoader != null) { - V loadedValue = cacheLoader.load(key); - if (loadedValue != null) { - map.put(key, value); - value = loadedValue; - } - } - map.put(key, value); - } - return result; - } - - /** - * {@inheritDoc} - */ - @Override - public void putAll(Map map) { - synchronized (map) { - for (Map.Entry entry : map.entrySet()) { - this.map.put(entry.getKey(), entry.getValue()); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public boolean putIfAbsent(K key, V value) { - synchronized (map) { - if (!map.containsKey(key)) { - map.put(key, value); - return true; - } else { - return false; - } - } } - /** - * {@inheritDoc} - */ + @Override + public Class getValueType() { + return null; + } + + @Override + public boolean isStoreByValue() { + return false; + } + } + + public MapCache( + CacheManager manager, String name, CacheLoader cacheLoader, boolean isReadThrough) { + this.manager = manager; + this.name = name; + this.cacheLoader = cacheLoader; + this.isReadThrough = isReadThrough; + } + + /** {@inheritDoc} */ + @Override + public V get(K key) { + V value = null; + synchronized (map) { + value = map.get(key); + if (value == null && isReadThrough && cacheLoader != null) { + V loadedValue = cacheLoader.load(key); + if (loadedValue != null) { + map.put(key, loadedValue); + value = loadedValue; + } + } + } + return value; + } + + /** {@inheritDoc} */ + @Override + public Map getAll(Set keys) { + Map result = null; + synchronized (map) { + result = new HashMap(keys.size()); + for (K key : keys) { + V value = map.get(key); + if (value != null) { + result.put(key, value); + keys.remove(key); + } + } + if (isReadThrough && cacheLoader != null) { + for (K key : keys) { + Map loadedValues = cacheLoader.loadAll(keys); + for (Map.Entry entry : loadedValues.entrySet()) { + V v = entry.getValue(); + if (v != null) { + K k = entry.getKey(); + map.put(k, v); + result.put(k, v); + } + } + } + } + } + return result; + } + + /** {@inheritDoc} */ + @Override + public boolean containsKey(K key) { + return map.containsKey(key); + } + + /** {@inheritDoc} */ + @Override + public void loadAll( + Set keys, boolean replaceExistingValues, CompletionListener completionListener) { + if (cacheLoader != null) { + try { + synchronized (map) { + Map loadedValues = cacheLoader.loadAll(keys); + for (Map.Entry entry : loadedValues.entrySet()) { + V value = entry.getValue(); + K key = entry.getKey(); + if (value != null) { + boolean existsCurrently = map.containsKey(key); + if (!existsCurrently || replaceExistingValues) { + map.put(key, value); + keys.remove(key); + } + } + } + } + } catch (Exception e) { + if (completionListener != null) { + completionListener.onException(e); + } + } + } + if (completionListener != null) { + if (keys.isEmpty()) { + completionListener.onCompletion(); + } + } + } + + /** {@inheritDoc} */ + @Override + public void put(K key, V value) { + map.put(key, value); + } + + /** {@inheritDoc} */ + @Override + public V getAndPut(K key, V value) { + V result = null; + synchronized (map) { + result = map.get(key); + if (value == null && isReadThrough && cacheLoader != null) { + V loadedValue = cacheLoader.load(key); + if (loadedValue != null) { + map.put(key, value); + value = loadedValue; + } + } + map.put(key, value); + } + return result; + } + + /** {@inheritDoc} */ + @Override + public void putAll(Map map) { + synchronized (map) { + for (Map.Entry entry : map.entrySet()) { + this.map.put(entry.getKey(), entry.getValue()); + } + } + } + + /** {@inheritDoc} */ + @Override + public boolean putIfAbsent(K key, V value) { + synchronized (map) { + if (!map.containsKey(key)) { + map.put(key, value); + return true; + } else { + return false; + } + } + } + + /** {@inheritDoc} */ @Override public boolean remove(K key) { - boolean removed = false; - synchronized (map) { - removed = map.remove(key) != null; - notifyRemovedListeners(key); - } - return removed; + boolean removed = false; + synchronized (map) { + removed = map.remove(key) != null; + notifyRemovedListeners(key); + } + return removed; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public boolean remove(K key, V oldValue) { - synchronized (map) { - V value = map.get(key); - if (value != null && oldValue.equals(value)) { - map.remove(key); - notifyRemovedListeners(key); - return true; - } + synchronized (map) { + V value = map.get(key); + if (value != null && oldValue.equals(value)) { + map.remove(key); + notifyRemovedListeners(key); + return true; } - return false; + } + return false; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public V getAndRemove(K key) { - synchronized (map) { - V oldValue = null; - oldValue = map.get(key); - map.remove(key); - notifyRemovedListeners(key); - return oldValue; - } + synchronized (map) { + V oldValue = null; + oldValue = map.get(key); + map.remove(key); + notifyRemovedListeners(key); + return oldValue; + } } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public boolean replace(K key, V oldValue, V newValue) { - synchronized (map) { - V value = map.get(key); - if (value != null && oldValue.equals(value)) { - map.put(key, newValue); - return true; - } + synchronized (map) { + V value = map.get(key); + if (value != null && oldValue.equals(value)) { + map.put(key, newValue); + return true; } - return false; + } + return false; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public boolean replace(K key, V value) { - synchronized (map) { - if (map.containsKey(key)) { - map.put(key, value); - return true; - } + synchronized (map) { + if (map.containsKey(key)) { + map.put(key, value); + return true; } - return false; + } + return false; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public V getAndReplace(K key, V value) { - synchronized (map) { - V oldValue = map.get(key); - if (value != null && value.equals(oldValue)) { - map.put(key, value); - return oldValue; - } + synchronized (map) { + V oldValue = map.get(key); + if (value != null && value.equals(oldValue)) { + map.put(key, value); + return oldValue; } - return null; + } + return null; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public void removeAll(Set keys) { - synchronized (map) { - for (K key : keys) { - if (map.containsKey(key)) { - map.remove(key); - } else { - keys.remove(key); - } - } + synchronized (map) { + for (K key : keys) { + if (map.containsKey(key)) { + map.remove(key); + } else { + keys.remove(key); + } } - notifyRemovedListeners(keys); + } + notifyRemovedListeners(keys); } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public void removeAll() { - synchronized (map) { - Set keys = map.keySet(); - map.clear(); - notifyRemovedListeners(keys); - } + synchronized (map) { + Set keys = map.keySet(); + map.clear(); + notifyRemovedListeners(keys); + } } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public void clear() { - map.clear(); + map.clear(); } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public > C getConfiguration(Class clazz) { - if (!MapConfiguration.class.isAssignableFrom(clazz)) { - throw new IllegalArgumentException(); - } - return null; + if (!MapConfiguration.class.isAssignableFrom(clazz)) { + throw new IllegalArgumentException(); + } + return null; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public T invoke(K key, EntryProcessor entryProcessor, Object... arguments) throws EntryProcessorException { return null; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public Map> invokeAll( Set keys, EntryProcessor entryProcessor, Object... arguments) { - synchronized (map) { - for (K key : keys) { - V value = map.get(key); - if (value != null) { - entryProcessor.process(new MutableEntry() { - @Override public boolean exists() { - return map.containsKey(key); - } + synchronized (map) { + for (K key : keys) { + V value = map.get(key); + if (value != null) { + entryProcessor.process( + new MutableEntry() { + @Override + public boolean exists() { + return map.containsKey(key); + } - @Override public void remove() { - synchronized (map) { - V value = map.get(key); - if (value != null) { - map.remove(key); - notifyRemovedListeners(key); - } - } - } + @Override + public void remove() { + synchronized (map) { + V value = map.get(key); + if (value != null) { + map.remove(key); + notifyRemovedListeners(key); + } + } + } - @Override public K getKey() { - return key; - } + @Override + public K getKey() { + return key; + } - @Override public V getValue() { - return map.get(value); - } + @Override + public V getValue() { + return map.get(value); + } - @Override public T unwrap(Class clazz) { - return null; - } + @Override + public T unwrap(Class clazz) { + return null; + } - @Override public void setValue(V value) { - map.put(key, value); - } - }, arguments); - } - } + @Override + public void setValue(V value) { + map.put(key, value); + } + }, + arguments); + } } + } return null; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public String getName() { return name; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public CacheManager getCacheManager() { return manager; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override - public void close() { - } + public void close() {} - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public boolean isClosed() { return false; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public T unwrap(Class clazz) { return (T) map; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public void registerCacheEntryListener( CacheEntryListenerConfiguration cacheEntryListenerConfiguration) { - //cacheEntryRemovedListeners.add(cacheEntryListenerConfiguration.getCacheEntryListenerFactory().create()); + //cacheEntryRemovedListeners.add(cacheEntryListenerConfiguration.getCacheEntryListenerFactory().create()); } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public void deregisterCacheEntryListener( CacheEntryListenerConfiguration cacheEntryListenerConfiguration) {} - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public Iterator> iterator() { - synchronized (map) { - return new Iterator>() { + synchronized (map) { + return new Iterator>() { - Iterator> entries = map.entrySet().iterator(); + Iterator> entries = map.entrySet().iterator(); - @Override - public boolean hasNext() { - return entries.hasNext(); - } + @Override + public boolean hasNext() { + return entries.hasNext(); + } - @Override - public Entry next() { - Map.Entry entry = entries.next(); - return new Entry() { - K key = entry.getKey(); - V value = entry.getValue(); + @Override + public Entry next() { + Map.Entry entry = entries.next(); + return new Entry() { + K key = entry.getKey(); + V value = entry.getValue(); - @Override public K getKey() { - return key; - } + @Override + public K getKey() { + return key; + } - @Override public V getValue() { - return value; - } + @Override + public V getValue() { + return value; + } - @Override public T unwrap(Class clazz) { - return null; - } - }; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } + @Override + public T unwrap(Class clazz) { + return null; + } }; - } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } } private void notifyRemovedListeners(K key) { -// if (cacheEntryRemovedListeners != null) { -// cacheEntryRemovedListeners.forEach(listener -> listener.onRemoved()) -// } - } - - private void notifyRemovedListeners(Set keys) { - + // if (cacheEntryRemovedListeners != null) { + // cacheEntryRemovedListeners.forEach(listener -> listener.onRemoved()) + // } } + private void notifyRemovedListeners(Set keys) {} } diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java index 4d81fad..b08a3ae 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java @@ -143,6 +143,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { Widget w1, w1a, w2, w3, w4; UUID key1 = UUIDs.timeBased(); UUID key2 = UUIDs.timeBased(); + String cacheKey1 = MappingUtil.getTableName(Widget.class, false) + "." + key1.toString(); + String cacheKey2 = MappingUtil.getTableName(Widget.class, false) + "." + key2.toString(); // This should inserted Widget, and not cache it in uow1. try (UnitOfWork uow1 = session.begin()) { @@ -156,6 +158,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .value(widget::c, RandomString.make(10)) .value(widget::d, RandomString.make(10)) .sync(uow1); + uow1.getCache().put(cacheKey1, w1); + Assert.assertEquals(w1, uow1.getCache().get(cacheKey1)); try (UnitOfWork uow2 = session.begin(uow1)) { @@ -180,6 +184,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .orElse(null); Assert.assertEquals(w1, w2); + uow2.getCache().put(cacheKey2, w2); w3 = session @@ -192,6 +197,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .value(widget::d, RandomString.make(10)) .sync(uow2); + Assert.assertEquals(w1, uow2.getCache().get(cacheKey1)); + Assert.assertEquals(w2, uow2.getCache().get(cacheKey2)); uow2.commit() .andThen( () -> { @@ -378,6 +385,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { Widget w1, w2, w3, w4, w5, w6; Long committedAt = 0L; UUID key = UUIDs.timeBased(); + String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key.toString(); try (UnitOfWork uow = session.begin()) { w1 = @@ -392,6 +400,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .batch(uow); Assert.assertTrue(0L == w1.writtenAt(widget::name)); Assert.assertTrue(0 == w1.ttlOf(widget::name)); + uow.getCache().put(cacheKey, w1); w2 = session .update(w1) @@ -424,6 +433,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .value(widget::d, RandomString.make(10)) .batch(uow); + uow.getCache().put(cacheKey, w1); uow.commit(); committedAt = uow.committedAt(); Date d = new Date(committedAt * 1000);