A few minor fixes.
This commit is contained in:
parent
48545c1e84
commit
01a458a7f6
5 changed files with 56 additions and 234 deletions
247
NOTES
247
NOTES
|
@ -1,172 +1,27 @@
|
|||
Operation/
|
||||
|-- AbstractStatementOperation
|
||||
| |-- AbstractOperation
|
||||
| | |-- AbstractFilterOperation
|
||||
| | | |-- CountOperation
|
||||
| | | |-- DeleteOperation
|
||||
| | | `-- UpdateOperation
|
||||
| | |-- BoundOperation
|
||||
| | `-- InsertOperation
|
||||
| |-- AbstractOptionalOperation
|
||||
| | |-- AbstractFilterOptionalOperation
|
||||
| | | |-- SelectFirstOperation
|
||||
| | | `-- SelectFirstTransformingOperation
|
||||
| | `-- BoundOptionalOperation
|
||||
| `-- AbstractStreamOperation
|
||||
| |-- AbstractFilterStreamOperation
|
||||
| | |-- SelectOperation
|
||||
| | `-- SelectTransformingOperation
|
||||
| `-- BoundStreamOperation
|
||||
|-- PreparedOperation
|
||||
|-- PreparedOptionalOperation
|
||||
`-- PreparedStreamOperation
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
--- Cache
|
||||
// `E` is the type of the Entity class or one of:
|
||||
// - ResultSet
|
||||
// - ArrayTuple{N}
|
||||
// - Count
|
||||
// `F` is the type argument passed to us from HelenusSession DSL and carried on via one of the
|
||||
// Operation classes, it is going to be one of:
|
||||
// - ResultSet
|
||||
// - ArrayTuple{N}
|
||||
// - or a type previously registered as a HelenusEntity.
|
||||
// In the form of a:
|
||||
// - Stream<?> or an
|
||||
// - Optional<?>
|
||||
//
|
||||
// Operation/
|
||||
// |-- AbstractStatementOperation
|
||||
// | |-- AbstractOperation
|
||||
// | | |-- AbstractFilterOperation
|
||||
// | | | |-- CountOperation
|
||||
// | | | |-- DeleteOperation
|
||||
// | | | `-- UpdateOperation
|
||||
// | | |-- BoundOperation
|
||||
// | | `-- InsertOperation
|
||||
// | |-- AbstractOptionalOperation
|
||||
// | | |-- AbstractFilterOptionalOperation
|
||||
// | | | |-- SelectFirstOperation
|
||||
// | | | `-- SelectFirstTransformingOperation
|
||||
// | | `-- BoundOptionalOperation
|
||||
// | `-- AbstractStreamOperation
|
||||
// | |-- AbstractFilterStreamOperation
|
||||
// | | |-- SelectOperation
|
||||
// | | `-- SelectTransformingOperation
|
||||
// | `-- BoundStreamOperation
|
||||
// |-- PreparedOperation
|
||||
// |-- PreparedOptionalOperation
|
||||
// `-- PreparedStreamOperation
|
||||
//
|
||||
// These all boil down to: Select, Update, Insert, Delete and Count
|
||||
//
|
||||
// -- Select:
|
||||
// 1) Select statements that contain all primary key information will be "distinct" and
|
||||
// result in a single value or no match.
|
||||
// If present, return cached entity otherwise execute query and cache result.
|
||||
//
|
||||
// 2) Otherwise the result is a set, possibly empty, of values that match.
|
||||
// When within a UOW:
|
||||
// If present, return the cached value(s) from the statement cache matching the query string.
|
||||
// Otherwise, execute query and cache the result in the statement cache and update/merge the
|
||||
// entites into the entity cache.
|
||||
// NOTE: When we read data from the database we augment the select clause with TTL and write time
|
||||
// stamps for all columns that record such information so as to be able to properlty expire
|
||||
// and merge values in the cache.
|
||||
//
|
||||
// -- Update:
|
||||
// Execute the database statement and then iff successs upsert the entity being updated into the
|
||||
// entity cache.
|
||||
//
|
||||
// -- Insert/Upsert:
|
||||
// Same as Update.
|
||||
//
|
||||
// -- Delete:
|
||||
// Same as update, only remove the cached value from all caches on success.
|
||||
//
|
||||
// -- Count:
|
||||
// If operating within a UOW lookup count in statement cache, if not present execute query and cache result.
|
||||
//
|
||||
|
||||
|
||||
if (delegate instanceof SelectOperation) {
|
||||
SelectOperation<E> op = (SelectOperation<E>) delegate;
|
||||
|
||||
// Determine if we are caching and if so where.
|
||||
AbstractCache<CacheKey, Set<E>> cache = delegate.getCache();
|
||||
boolean prepareStatementForCaching = cache != null;
|
||||
if (uow != null) {
|
||||
prepareStatementForCaching = true;
|
||||
cache = uow.<Set<E>>getCacheEnclosing(cache);
|
||||
}
|
||||
|
||||
// The delegate will provide the cache key becuase it will either be:
|
||||
// a) when distinct: the combination of the partition/cluster key columns
|
||||
// b) otherwise: the table name followed by the portion of the SQL statement that would form the WHERE clause
|
||||
CacheKey key = (cache == null) ? null : delegate.getCacheKey();
|
||||
if (key != null && cache != null) {
|
||||
Set<E> value = cache.get(key);
|
||||
if (value != null) {
|
||||
// Select will always return a Stream<E>
|
||||
// TODO(gburd): SelectTransforming... apply fn here?
|
||||
result = (E) value.stream();
|
||||
if (cacheHitCounter != null) {
|
||||
cacheHitCounter.inc();
|
||||
}
|
||||
if (log != null) {
|
||||
log.info("cache hit");
|
||||
}
|
||||
return result;
|
||||
} else {
|
||||
if (cacheMissCounter != null) {
|
||||
cacheMissCounter.inc();
|
||||
}
|
||||
if (log != null) {
|
||||
log.info("cache miss");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
if (cache != null) {
|
||||
Object obj = delegate.unwrap(result);
|
||||
if (obj != null) {
|
||||
cache.put(key, obj);
|
||||
}
|
||||
|
||||
delegate.<E>extract(result, key, cache);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
// TODO: first, ask the delegate for the cacheKey
|
||||
// if this is a SELECT query:
|
||||
// if not in cache build the statement, execute the future, cache the result, transform the result then cache the transformations
|
||||
// if INSERT/UPSERT/UPDATE
|
||||
// if DELETE
|
||||
// if COUNT
|
||||
----------------------------
|
||||
|
||||
@Override
|
||||
public CacheKey getCacheKey() {
|
||||
|
||||
List<String>keys = new ArrayList<>(filters.size());
|
||||
HelenusEntity entity = props.get(0).getEntity();
|
||||
|
||||
for (HelenusPropertyNode prop : props) {
|
||||
switch(prop.getProperty().getColumnType()) {
|
||||
case PARTITION_KEY:
|
||||
case CLUSTERING_COLUMN:
|
||||
|
||||
Filter filter = filters.get(prop.getProperty());
|
||||
if (filter != null) {
|
||||
keys.add(filter.toString());
|
||||
} else {
|
||||
// we're missing a part of the primary key, so we can't create a proper cache key
|
||||
return null;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// We've past the primary key components in this ordered list, so we're done building
|
||||
// the cache key.
|
||||
if (keys.size() > 0) {
|
||||
return new CacheKey(entity, Joiner.on(",").join(keys));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
---------------------------
|
||||
|
||||
// TODO(gburd): create a statement that matches one that wasn't prepared
|
||||
//String key =
|
||||
// "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString();
|
||||
|
@ -175,64 +30,6 @@
|
|||
//}
|
||||
|
||||
|
||||
------------------------
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.ResultSetFuture;
|
||||
import com.datastax.driver.core.Statement;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.RemovalListener;
|
||||
import com.google.common.cache.RemovalNotification;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class AbstractCache<K, V> {
|
||||
final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
public Cache<K, V> cache;
|
||||
|
||||
public AbstractCache() {
|
||||
RemovalListener<K, V> listener =
|
||||
new RemovalListener<K, V>() {
|
||||
@Override
|
||||
public void onRemoval(RemovalNotification<K, V> n) {
|
||||
if (n.wasEvicted()) {
|
||||
String cause = n.getCause().name();
|
||||
logger.info(cause);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
cache = CacheBuilder.newBuilder()
|
||||
.maximumSize(10_000)
|
||||
.expireAfterAccess(20, TimeUnit.MINUTES)
|
||||
.weakKeys()
|
||||
.softValues()
|
||||
.removalListener(listener)
|
||||
.build();
|
||||
}
|
||||
|
||||
V get(K key) {
|
||||
return cache.getIfPresent(key);
|
||||
}
|
||||
|
||||
void put(K key, V value) {
|
||||
cache.put(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
------------------------------------------------------------------------------------------------
|
||||
|
||||
cache entites (2 methods) marked @Cacheable
|
||||
cache entites in txn context
|
||||
cache results when .cache() chained before .{a}sync() call, return a EvictableCacheItem<E> that has an .evict() method
|
||||
fix txn .andThen() chains
|
||||
|
||||
|
||||
|
||||
|
||||
primitive types have default values, (e.g. boolean, int, ...) but primative wrapper classes do not and can be null (e.g. Boolean, Integer, ...)
|
||||
|
|
|
@ -19,15 +19,36 @@ package net.helenus.core.cache;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.RemovalListener;
|
||||
import com.google.common.cache.RemovalNotification;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public interface SessionCache<K, V> {
|
||||
|
||||
static final Logger LOG = LoggerFactory.getLogger(SessionCache.class);
|
||||
|
||||
static <K, V> SessionCache<K, V> defaultCache() {
|
||||
int MAX_CACHE_SIZE = 10000;
|
||||
int MAX_CACHE_EXPIRE_SECONDS = 600;
|
||||
return new GuavaCache<K, V>(CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE)
|
||||
.expireAfterAccess(MAX_CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS)
|
||||
.expireAfterWrite(MAX_CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).recordStats().build());
|
||||
GuavaCache<K, V> cache;
|
||||
RemovalListener<K, V> listener =
|
||||
new RemovalListener<K, V>() {
|
||||
@Override
|
||||
public void onRemoval(RemovalNotification<K, V> n) {
|
||||
if (n.wasEvicted()) {
|
||||
String cause = n.getCause().name();
|
||||
LOG.info(cause);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
cache = new GuavaCache<K, V>(CacheBuilder.newBuilder()
|
||||
.maximumSize(25_000)
|
||||
.expireAfterAccess(5, TimeUnit.MINUTES)
|
||||
.softValues()
|
||||
.removalListener(listener)
|
||||
.build());
|
||||
|
||||
return cache;
|
||||
}
|
||||
|
||||
void invalidate(K key);
|
||||
|
|
|
@ -709,6 +709,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
|
|||
cacheUpdate(uow, result, bindFacetValues());
|
||||
} else if (pojo != null) {
|
||||
cacheUpdate(uow, (E) pojo, bindFacetValues());
|
||||
return (E) pojo;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.lang.reflect.Constructor;
|
|||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -94,6 +95,9 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
|
|||
if (otherObj instanceof MapExportable && src.equals(((MapExportable) otherObj).toMap())) {
|
||||
return true;
|
||||
}
|
||||
if (src instanceof MapExportable && otherObj.equals(((MapExportable) src).toMap())) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -122,7 +126,7 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
|
|||
}
|
||||
|
||||
if (MapExportable.TO_MAP_METHOD.equals(methodName)) {
|
||||
return src; // return Collections.unmodifiableMap(src);
|
||||
return src; //Collections.unmodifiableMap(src);
|
||||
}
|
||||
|
||||
Object value = src.get(methodName);
|
||||
|
|
|
@ -156,9 +156,8 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
|||
.sync(uow).orElse(null);
|
||||
Assert.assertEquals(w1, w2);
|
||||
|
||||
// This should remove the object from the cache.
|
||||
//TODO(gburd): w3 = session.
|
||||
session.<Widget>update(w2)
|
||||
// This should remove the object from the session cache.
|
||||
w3 = session.<Widget>update(w2)
|
||||
.set(widget::name, "Bill")
|
||||
.where(widget::id, eq(key))
|
||||
.sync(uow);
|
||||
|
|
Loading…
Reference in a new issue