diff --git a/NOTES b/NOTES index bbacfc9..1b7148e 100644 --- a/NOTES +++ b/NOTES @@ -1,172 +1,27 @@ +Operation/ +|-- AbstractStatementOperation +| |-- AbstractOperation +| | |-- AbstractFilterOperation +| | | |-- CountOperation +| | | |-- DeleteOperation +| | | `-- UpdateOperation +| | |-- BoundOperation +| | `-- InsertOperation +| |-- AbstractOptionalOperation +| | |-- AbstractFilterOptionalOperation +| | | |-- SelectFirstOperation +| | | `-- SelectFirstTransformingOperation +| | `-- BoundOptionalOperation +| `-- AbstractStreamOperation +| |-- AbstractFilterStreamOperation +| | |-- SelectOperation +| | `-- SelectTransformingOperation +| `-- BoundStreamOperation +|-- PreparedOperation +|-- PreparedOptionalOperation +`-- PreparedStreamOperation - - - ---- Cache - // `E` is the type of the Entity class or one of: - // - ResultSet - // - ArrayTuple{N} - // - Count - // `F` is the type argument passed to us from HelenusSession DSL and carried on via one of the - // Operation classes, it is going to be one of: - // - ResultSet - // - ArrayTuple{N} - // - or a type previously registered as a HelenusEntity. - // In the form of a: - // - Stream or an - // - Optional - // - // Operation/ - // |-- AbstractStatementOperation - // | |-- AbstractOperation - // | | |-- AbstractFilterOperation - // | | | |-- CountOperation - // | | | |-- DeleteOperation - // | | | `-- UpdateOperation - // | | |-- BoundOperation - // | | `-- InsertOperation - // | |-- AbstractOptionalOperation - // | | |-- AbstractFilterOptionalOperation - // | | | |-- SelectFirstOperation - // | | | `-- SelectFirstTransformingOperation - // | | `-- BoundOptionalOperation - // | `-- AbstractStreamOperation - // | |-- AbstractFilterStreamOperation - // | | |-- SelectOperation - // | | `-- SelectTransformingOperation - // | `-- BoundStreamOperation - // |-- PreparedOperation - // |-- PreparedOptionalOperation - // `-- PreparedStreamOperation - // - // These all boil down to: Select, Update, Insert, Delete and Count - // - // -- Select: - // 1) Select statements that contain all primary key information will be "distinct" and - // result in a single value or no match. - // If present, return cached entity otherwise execute query and cache result. - // - // 2) Otherwise the result is a set, possibly empty, of values that match. - // When within a UOW: - // If present, return the cached value(s) from the statement cache matching the query string. - // Otherwise, execute query and cache the result in the statement cache and update/merge the - // entites into the entity cache. - // NOTE: When we read data from the database we augment the select clause with TTL and write time - // stamps for all columns that record such information so as to be able to properlty expire - // and merge values in the cache. - // - // -- Update: - // Execute the database statement and then iff successs upsert the entity being updated into the - // entity cache. - // - // -- Insert/Upsert: - // Same as Update. - // - // -- Delete: - // Same as update, only remove the cached value from all caches on success. - // - // -- Count: - // If operating within a UOW lookup count in statement cache, if not present execute query and cache result. - // - - - if (delegate instanceof SelectOperation) { - SelectOperation op = (SelectOperation) delegate; - - // Determine if we are caching and if so where. - AbstractCache> cache = delegate.getCache(); - boolean prepareStatementForCaching = cache != null; - if (uow != null) { - prepareStatementForCaching = true; - cache = uow.>getCacheEnclosing(cache); - } - - // The delegate will provide the cache key becuase it will either be: - // a) when distinct: the combination of the partition/cluster key columns - // b) otherwise: the table name followed by the portion of the SQL statement that would form the WHERE clause - CacheKey key = (cache == null) ? null : delegate.getCacheKey(); - if (key != null && cache != null) { - Set value = cache.get(key); - if (value != null) { - // Select will always return a Stream - // TODO(gburd): SelectTransforming... apply fn here? - result = (E) value.stream(); - if (cacheHitCounter != null) { - cacheHitCounter.inc(); - } - if (log != null) { - log.info("cache hit"); - } - return result; - } else { - if (cacheMissCounter != null) { - cacheMissCounter.inc(); - } - if (log != null) { - log.info("cache miss"); - } - } - } - } - - - - if (cache != null) { - Object obj = delegate.unwrap(result); - if (obj != null) { - cache.put(key, obj); - } - - delegate.extract(result, key, cache); - } - } - - } - - - -// TODO: first, ask the delegate for the cacheKey -// if this is a SELECT query: -// if not in cache build the statement, execute the future, cache the result, transform the result then cache the transformations -// if INSERT/UPSERT/UPDATE -// if DELETE -// if COUNT ----------------------------- - - @Override - public CacheKey getCacheKey() { - - Listkeys = new ArrayList<>(filters.size()); - HelenusEntity entity = props.get(0).getEntity(); - - for (HelenusPropertyNode prop : props) { - switch(prop.getProperty().getColumnType()) { - case PARTITION_KEY: - case CLUSTERING_COLUMN: - - Filter filter = filters.get(prop.getProperty()); - if (filter != null) { - keys.add(filter.toString()); - } else { - // we're missing a part of the primary key, so we can't create a proper cache key - return null; - } - break; - default: - // We've past the primary key components in this ordered list, so we're done building - // the cache key. - if (keys.size() > 0) { - return new CacheKey(entity, Joiner.on(",").join(keys)); - } - return null; - } - } - return null; - } - ---------------------------- - // TODO(gburd): create a statement that matches one that wasn't prepared //String key = // "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString(); @@ -175,64 +30,6 @@ //} ------------------------- -package net.helenus.core.operation; - -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Statement; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -public abstract class AbstractCache { - final Logger logger = LoggerFactory.getLogger(getClass()); - public Cache cache; - - public AbstractCache() { - RemovalListener listener = - new RemovalListener() { - @Override - public void onRemoval(RemovalNotification n) { - if (n.wasEvicted()) { - String cause = n.getCause().name(); - logger.info(cause); - } - } - }; - - cache = CacheBuilder.newBuilder() - .maximumSize(10_000) - .expireAfterAccess(20, TimeUnit.MINUTES) - .weakKeys() - .softValues() - .removalListener(listener) - .build(); - } - - V get(K key) { - return cache.getIfPresent(key); - } - - void put(K key, V value) { - cache.put(key, value); - } -} - ------------------------------------------------------------------------------------------------- - -cache entites (2 methods) marked @Cacheable -cache entites in txn context -cache results when .cache() chained before .{a}sync() call, return a EvictableCacheItem that has an .evict() method -fix txn .andThen() chains - - primitive types have default values, (e.g. boolean, int, ...) but primative wrapper classes do not and can be null (e.g. Boolean, Integer, ...) diff --git a/src/main/java/net/helenus/core/cache/SessionCache.java b/src/main/java/net/helenus/core/cache/SessionCache.java index 44717e0..ec2d73b 100644 --- a/src/main/java/net/helenus/core/cache/SessionCache.java +++ b/src/main/java/net/helenus/core/cache/SessionCache.java @@ -19,15 +19,36 @@ package net.helenus.core.cache; import java.util.concurrent.TimeUnit; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public interface SessionCache { + static final Logger LOG = LoggerFactory.getLogger(SessionCache.class); + static SessionCache defaultCache() { - int MAX_CACHE_SIZE = 10000; - int MAX_CACHE_EXPIRE_SECONDS = 600; - return new GuavaCache(CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE) - .expireAfterAccess(MAX_CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS) - .expireAfterWrite(MAX_CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).recordStats().build()); + GuavaCache cache; + RemovalListener listener = + new RemovalListener() { + @Override + public void onRemoval(RemovalNotification n) { + if (n.wasEvicted()) { + String cause = n.getCause().name(); + LOG.info(cause); + } + } + }; + + cache = new GuavaCache(CacheBuilder.newBuilder() + .maximumSize(25_000) + .expireAfterAccess(5, TimeUnit.MINUTES) + .softValues() + .removalListener(listener) + .build()); + + return cache; } void invalidate(K key); diff --git a/src/main/java/net/helenus/core/operation/UpdateOperation.java b/src/main/java/net/helenus/core/operation/UpdateOperation.java index aa65bae..80b3b9c 100644 --- a/src/main/java/net/helenus/core/operation/UpdateOperation.java +++ b/src/main/java/net/helenus/core/operation/UpdateOperation.java @@ -709,6 +709,7 @@ public final class UpdateOperation extends AbstractFilterOperation implements InvocationHandler, Serializab if (otherObj instanceof MapExportable && src.equals(((MapExportable) otherObj).toMap())) { return true; } + if (src instanceof MapExportable && otherObj.equals(((MapExportable) src).toMap())) { + return true; + } return false; } @@ -122,7 +126,7 @@ public class MapperInvocationHandler implements InvocationHandler, Serializab } if (MapExportable.TO_MAP_METHOD.equals(methodName)) { - return src; // return Collections.unmodifiableMap(src); + return src; //Collections.unmodifiableMap(src); } Object value = src.get(methodName); diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java index 2369393..254512a 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java @@ -156,9 +156,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .sync(uow).orElse(null); Assert.assertEquals(w1, w2); - // This should remove the object from the cache. - //TODO(gburd): w3 = session. - session.update(w2) + // This should remove the object from the session cache. + w3 = session.update(w2) .set(widget::name, "Bill") .where(widget::id, eq(key)) .sync(uow);