From b023ec359bfd866307a0f1e7dcf0298eb41ba05b Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Fri, 9 Feb 2018 21:55:23 -0500 Subject: [PATCH] Moving toward javax.cache.Cache-based UOW cache API. --- .../net/helenus/core/AbstractUnitOfWork.java | 53 +-- .../java/net/helenus/core/UnitOfWork.java | 11 +- .../java/net/helenus/core/cache/MapCache.java | 434 ++++++++++++++++-- .../core/unitofwork/UnitOfWorkTest.java | 10 +- 4 files changed, 427 insertions(+), 81 deletions(-) diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java index 46ad287..38926ce 100644 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -32,6 +32,7 @@ import javax.cache.Cache; import javax.cache.CacheManager; import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; +import net.helenus.core.cache.MapCache; import net.helenus.core.operation.AbstractOperation; import net.helenus.core.operation.BatchOperation; import net.helenus.mapping.MappingUtil; @@ -50,9 +51,9 @@ public abstract class AbstractUnitOfWork private final List> nested = new ArrayList<>(); private final HelenusSession session; - private final AbstractUnitOfWork parent; + public final AbstractUnitOfWork parent; private final Table>> cache = HashBasedTable.create(); - private final Map statementCache = new ConcurrentHashMap(); + private final MapCache statementCache = new MapCache(null, "UOW(" + hashCode() + ")", this); protected String purpose; protected List nestedPurposes = new ArrayList(); protected String info; @@ -205,19 +206,6 @@ public abstract class AbstractUnitOfWork } } - @Override - public Optional cacheLookup(String key) { - AbstractUnitOfWork self = this; - do { - Object result = self.statementCache.get(key); - if (result != null) { - return result == deleted ? Optional.ofNullable(null) : Optional.of(result); - } - self = self.parent; - } while (self != null); - return Optional.empty(); - } - @Override public Optional cacheLookup(List facets) { String tableName = CacheUtil.schemaName(facets); @@ -258,16 +246,6 @@ public abstract class AbstractUnitOfWork return result; } - @Override - public void cacheEvict(String key) { - statementCache.remove(key); - } - - @Override - public void cacheDelete(String key) { - statementCache.put(key, deleted); - } - @Override public List cacheEvict(List facets) { Either> deletedObjectFacets = Either.right(facets); @@ -305,8 +283,8 @@ public abstract class AbstractUnitOfWork } @Override - public Object cacheUpdate(String key, Object value) { - return statementCache.put(key, value); + public Cache getCache() { + return statementCache; } @Override @@ -376,10 +354,10 @@ public abstract class AbstractUnitOfWork applyPostCommitFunctions("aborted", abortThunks); }); - elapsedTime.stop(); - if (LOG.isInfoEnabled()) { - LOG.info(logTimers("aborted")); - } + elapsedTime.stop(); + if (LOG.isInfoEnabled()) { + LOG.info(logTimers("aborted")); + } } return new PostCommitFunction(this, null, null, false); @@ -400,12 +378,12 @@ public abstract class AbstractUnitOfWork // Merge our statement cache into the session cache if it exists. CacheManager cacheManager = session.getCacheManager(); if (cacheManager != null) { - for (Map.Entry entry : statementCache.entrySet()) { + for (Map.Entry entry : (Set>)statementCache.unwrap(Map.class).entrySet()) { String[] keyParts = entry.getKey().split("\\."); if (keyParts.length == 2) { String cacheName = keyParts[0]; String key = keyParts[1]; - if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) { + if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) { Cache cache = cacheManager.getCache(cacheName); if (cache != null) { Object value = entry.getValue(); @@ -439,7 +417,7 @@ public abstract class AbstractUnitOfWork } else { // Merge cache and statistics into parent if there is one. - parent.statementCache.putAll(statementCache); + parent.statementCache.putAll(statementCache.unwrap(Map.class)); parent.mergeCache(cache); parent.addBatched(batch); if (purpose != null) { @@ -498,6 +476,13 @@ public abstract class AbstractUnitOfWork uow.abortThunks.clear(); }); + if (parent == null) { + elapsedTime.stop(); + if (LOG.isInfoEnabled()) { + LOG.info(logTimers("aborted")); + } + } + // TODO(gburd): when we integrate the transaction support we'll need to... // log.record(txn::abort) // cache.invalidateSince(txn::start time) diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 898558c..7d98e90 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -20,6 +20,9 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; + +import javax.cache.Cache; + import net.helenus.core.cache.Facet; import net.helenus.core.operation.AbstractOperation; @@ -61,20 +64,14 @@ public interface UnitOfWork extends AutoCloseable { void addFuture(CompletableFuture future); - Optional cacheLookup(String key); - Optional cacheLookup(List facets); - Object cacheUpdate(String key, Object value); + Cache getCache(); Object cacheUpdate(Object pojo, List facets); - void cacheEvict(String key); - List cacheEvict(List facets); - public void cacheDelete(String key); - String getPurpose(); UnitOfWork setPurpose(String purpose); diff --git a/src/main/java/net/helenus/core/cache/MapCache.java b/src/main/java/net/helenus/core/cache/MapCache.java index 3082081..3c42b9a 100644 --- a/src/main/java/net/helenus/core/cache/MapCache.java +++ b/src/main/java/net/helenus/core/cache/MapCache.java @@ -1,155 +1,519 @@ package net.helenus.core.cache; +import static net.helenus.core.HelenusSession.deleted; + import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + import javax.cache.Cache; import javax.cache.CacheManager; import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.Configuration; +import javax.cache.event.CacheEntryRemovedListener; +import javax.cache.integration.CacheLoader; import javax.cache.integration.CompletionListener; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.EntryProcessorResult; +import javax.cache.processor.MutableEntry; + +import net.helenus.core.AbstractUnitOfWork; +import net.helenus.core.UnitOfWork; public class MapCache implements Cache { + private final CacheManager manager; + private final String name; + private final UnitOfWork uow; + private Map map = new ConcurrentHashMap(); + private Set cacheEntryRemovedListeners = new HashSet<>(); + private CacheLoader cacheLoader = null; + private boolean isReadThrough = false; + private Configuration configuration = new MapConfiguration(); - private Map map = new HashMap(); + private static class MapConfiguration implements Configuration { + @Override public Class getKeyType() { + return null; + } + + @Override public Class getValueType() { + return null; + } + + @Override public boolean isStoreByValue() { + return false; + } + } + + public MapCache(CacheManager manager, String name, UnitOfWork uow) { + this.manager = manager; + this.name = name; + this.uow = uow; + } + + private V map_get(K key) { + V value = null; + AbstractUnitOfWork uow = (AbstractUnitOfWork)this.uow; + do { + V result = (V) uow.getCache().get(key); + if (result != null) { + return result == deleted ? null : result; + } + uow = uow.parent; + } while (uow != null); + return null; + } + + /** + * {@inheritDoc} + */ @Override public V get(K key) { - return map.get(key); + V value = null; + synchronized (map) { + value = map_get(key); + if (value == null && isReadThrough && cacheLoader != null) { + V loadedValue = cacheLoader.load(key); + if (loadedValue != null) { + map.put(key, value); + value = loadedValue; + } + } + } + return value; } + /** + * {@inheritDoc} + */ @Override public Map getAll(Set keys) { - Map result = new HashMap(keys.size()); - for (K key : keys) { - V value = map.get(key); - if (value != null) { - result.put(key, value); + Map result = null; + synchronized (map) { + result = new HashMap(keys.size()); + for (K key : keys) { + V value = map_get(key); + if (value != null) { + result.put(key, value); + keys.remove(key); + } + } + if (isReadThrough && cacheLoader != null) { + for (K key : keys) { + Map loadedValues = cacheLoader.loadAll(keys); + for (Map.Entry entry : loadedValues.entrySet()) { + V v = entry.getValue(); + if (v != null) { + K k = entry.getKey(); + map.put(k, v); + result.put(k, v); + } + } + } + } } - } - return result; + return result; } + /** + * {@inheritDoc} + */ @Override public boolean containsKey(K key) { - return map.containsKey(key); + return map.containsKey(key); } + /** + * {@inheritDoc} + */ @Override - public void loadAll( - Set keys, - boolean replaceExistingValues, - CompletionListener completionListener) {} + public void loadAll(Set keys, boolean replaceExistingValues, CompletionListener completionListener) { + if (cacheLoader != null) { + try { + synchronized (map) { + Map loadedValues = cacheLoader.loadAll(keys); + for (Map.Entry entry : loadedValues.entrySet()) { + V value = entry.getValue(); + K key = entry.getKey(); + if (value != null) { + boolean existsCurrently = map.containsKey(key); + if (!existsCurrently || replaceExistingValues) { + map.put(key, value); + keys.remove(key); + } + } + } + } + } catch (Exception e) { + if (completionListener != null) { + completionListener.onException(e); + } + } + } + if (completionListener != null) { + if (keys.isEmpty()) { + completionListener.onCompletion(); + } + } + } + /** + * {@inheritDoc} + */ @Override - public void put(K key, V value) {} + public void put(K key, V value) { + map.put(key, value); + } + /** + * {@inheritDoc} + */ @Override public V getAndPut(K key, V value) { - return null; + V result = null; + synchronized (map) { + result = map_get(key); + if (value == null && isReadThrough && cacheLoader != null) { + V loadedValue = cacheLoader.load(key); + if (loadedValue != null) { + map.put(key, value); + value = loadedValue; + } + } + map.put(key, value); + } + return result; } + /** + * {@inheritDoc} + */ @Override - public void putAll(Map map) {} + public void putAll(Map map) { + synchronized (map) { + for (Map.Entry entry : map.entrySet()) { + this.map.put(entry.getKey(), entry.getValue()); + } + } + } + /** + * {@inheritDoc} + */ @Override public boolean putIfAbsent(K key, V value) { - return false; - } + synchronized (map) { + if (!map.containsKey(key)) { + map.put(key, value); + return true; + } else { + return false; + } + } + } + /** + * {@inheritDoc} + */ @Override public boolean remove(K key) { - return false; + boolean removed = false; + synchronized (map) { + removed = map.remove(key) != null; + notifyRemovedListeners(key); + } + return removed; } + /** + * {@inheritDoc} + */ @Override public boolean remove(K key, V oldValue) { - return false; + synchronized (map) { + V value = map.get(key); + if (value != null && oldValue.equals(value)) { + map.remove(key); + notifyRemovedListeners(key); + return true; + } + } + return false; } + /** + * {@inheritDoc} + */ @Override public V getAndRemove(K key) { - return null; + synchronized (map) { + V oldValue = null; + oldValue = map.get(key); + map.remove(key); + notifyRemovedListeners(key); + return oldValue; + } } + /** + * {@inheritDoc} + */ @Override public boolean replace(K key, V oldValue, V newValue) { - return false; + synchronized (map) { + V value = map.get(key); + if (value != null && oldValue.equals(value)) { + map.put(key, newValue); + return true; + } + } + return false; } + /** + * {@inheritDoc} + */ @Override public boolean replace(K key, V value) { - return false; + synchronized (map) { + if (map.containsKey(key)) { + map.put(key, value); + return true; + } + } + return false; } + /** + * {@inheritDoc} + */ @Override public V getAndReplace(K key, V value) { - return null; + synchronized (map) { + V oldValue = map.get(key); + if (value != null && value.equals(oldValue)) { + map.put(key, value); + return oldValue; + } + } + return null; } + /** + * {@inheritDoc} + */ @Override - public void removeAll(Set keys) {} + public void removeAll(Set keys) { + synchronized (map) { + for (K key : keys) { + if (map.containsKey(key)) { + map.remove(key); + } else { + keys.remove(key); + } + } + } + notifyRemovedListeners(keys); + } + /** + * {@inheritDoc} + */ @Override - public void removeAll() {} + public void removeAll() { + synchronized (map) { + Set keys = map.keySet(); + map.clear(); + notifyRemovedListeners(keys); + } + } + /** + * {@inheritDoc} + */ @Override - public void clear() {} + public void clear() { + map.clear(); + } + /** + * {@inheritDoc} + */ @Override public > C getConfiguration(Class clazz) { - return null; + if (!MapConfiguration.class.isAssignableFrom(clazz)) { + throw new IllegalArgumentException(); + } + return null; } + /** + * {@inheritDoc} + */ @Override public T invoke(K key, EntryProcessor entryProcessor, Object... arguments) throws EntryProcessorException { return null; } + /** + * {@inheritDoc} + */ @Override public Map> invokeAll( Set keys, EntryProcessor entryProcessor, Object... arguments) { + synchronized (map) { + for (K key : keys) { + V value = map.get(key); + if (value != null) { + entryProcessor.process(new MutableEntry() { + @Override public boolean exists() { + return map.containsKey(key); + } + + @Override public void remove() { + synchronized (map) { + V value = map.get(key); + if (value != null) { + map.remove(key); + notifyRemovedListeners(key); + } + } + } + + @Override public K getKey() { + return key; + } + + @Override public V getValue() { + return map.get(value); + } + + @Override public T unwrap(Class clazz) { + return null; + } + + @Override public void setValue(V value) { + map.put(key, value); + } + }, arguments); + } + } + } return null; } + /** + * {@inheritDoc} + */ @Override public String getName() { - return null; + return name; } + /** + * {@inheritDoc} + */ @Override public CacheManager getCacheManager() { - return null; + return manager; } + /** + * {@inheritDoc} + */ @Override - public void close() {} + public void close() { + } + /** + * {@inheritDoc} + */ @Override public boolean isClosed() { return false; } + /** + * {@inheritDoc} + */ @Override public T unwrap(Class clazz) { - return null; + return (T) map; } + /** + * {@inheritDoc} + */ @Override public void registerCacheEntryListener( - CacheEntryListenerConfiguration cacheEntryListenerConfiguration) {} + CacheEntryListenerConfiguration cacheEntryListenerConfiguration) { + //cacheEntryRemovedListeners.add(cacheEntryListenerConfiguration.getCacheEntryListenerFactory().create()); + } + /** + * {@inheritDoc} + */ @Override public void deregisterCacheEntryListener( CacheEntryListenerConfiguration cacheEntryListenerConfiguration) {} + /** + * {@inheritDoc} + */ @Override public Iterator> iterator() { - return null; + synchronized (map) { + return new Iterator>() { + + Iterator> entries = map.entrySet().iterator(); + + @Override + public boolean hasNext() { + return entries.hasNext(); + } + + @Override + public Entry next() { + Map.Entry entry = entries.next(); + return new Entry() { + K key = entry.getKey(); + V value = entry.getValue(); + + @Override public K getKey() { + return key; + } + + @Override public V getValue() { + return value; + } + + @Override public T unwrap(Class clazz) { + return null; + } + }; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } } + + private void notifyRemovedListeners(K key) { +// if (cacheEntryRemovedListeners != null) { +// cacheEntryRemovedListeners.forEach(listener -> listener.onRemoved()) +// } + } + + private void notifyRemovedListeners(Set keys) { + + } + } diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java index a9b5fb7..4d81fad 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java @@ -348,11 +348,11 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { session.select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null); String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key.toString(); - uow.cacheUpdate(cacheKey, w1); + uow.getCache().put(cacheKey, w1); // This should remove the object from the cache. session.delete(widget).where(widget::id, eq(key)).sync(uow); - uow.cacheDelete(cacheKey); + uow.getCache().remove(cacheKey); // This should fail to read from the cache. w3 = @@ -473,7 +473,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .sync(uow); String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key1.toString(); - uow.cacheUpdate(cacheKey, w1); + uow.getCache().put(cacheKey, w1); /* w2 = session.upsert(w1) .value(widget::a, RandomString.make(10)) @@ -507,11 +507,11 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .sync(uow); String cacheKey = MappingUtil.getTableName(Widget.class, false) + "." + key.toString(); - uow.cacheUpdate(cacheKey, w1); + uow.getCache().put(cacheKey, w1); // This should read from the cache and get the same instance of a Widget. w2 = session.select(widget).where(widget::id, eq(key)).single().sync(uow).orElse(null); - uow.cacheUpdate(cacheKey, w1); + uow.getCache().put(cacheKey, w1); uow.commit() .andThen(