--- Cache // `E` is the type of the Entity class or one of: // - ResultSet // - ArrayTuple{N} // - Count // `F` is the type argument passed to us from HelenusSession DSL and carried on via one of the // Operation classes, it is going to be one of: // - ResultSet // - ArrayTuple{N} // - or a type previously registered as a HelenusEntity. // In the form of a: // - Stream or an // - Optional // // Operation/ // |-- AbstractStatementOperation // | |-- AbstractOperation // | | |-- AbstractFilterOperation // | | | |-- CountOperation // | | | |-- DeleteOperation // | | | `-- UpdateOperation // | | |-- BoundOperation // | | `-- InsertOperation // | |-- AbstractOptionalOperation // | | |-- AbstractFilterOptionalOperation // | | | |-- SelectFirstOperation // | | | `-- SelectFirstTransformingOperation // | | `-- BoundOptionalOperation // | `-- AbstractStreamOperation // | |-- AbstractFilterStreamOperation // | | |-- SelectOperation // | | `-- SelectTransformingOperation // | `-- BoundStreamOperation // |-- PreparedOperation // |-- PreparedOptionalOperation // `-- PreparedStreamOperation // // These all boil down to: Select, Update, Insert, Delete and Count // // -- Select: // 1) Select statements that contain all primary key information will be "distinct" and // result in a single value or no match. // If present, return cached entity otherwise execute query and cache result. // // 2) Otherwise the result is a set, possibly empty, of values that match. // When within a UOW: // If present, return the cached value(s) from the statement cache matching the query string. // Otherwise, execute query and cache the result in the statement cache and update/merge the // entites into the entity cache. // NOTE: When we read data from the database we augment the select clause with TTL and write time // stamps for all columns that record such information so as to be able to properlty expire // and merge values in the cache. // // -- Update: // Execute the database statement and then iff successs upsert the entity being updated into the // entity cache. // // -- Insert/Upsert: // Same as Update. // // -- Delete: // Same as update, only remove the cached value from all caches on success. // // -- Count: // If operating within a UOW lookup count in statement cache, if not present execute query and cache result. // if (delegate instanceof SelectOperation) { SelectOperation op = (SelectOperation) delegate; // Determine if we are caching and if so where. AbstractCache> cache = delegate.getCache(); boolean prepareStatementForCaching = cache != null; if (uow != null) { prepareStatementForCaching = true; cache = uow.>getCacheEnclosing(cache); } // The delegate will provide the cache key becuase it will either be: // a) when distinct: the combination of the partition/cluster key columns // b) otherwise: the table name followed by the portion of the SQL statement that would form the WHERE clause CacheKey key = (cache == null) ? null : delegate.getCacheKey(); if (key != null && cache != null) { Set value = cache.get(key); if (value != null) { // Select will always return a Stream // TODO(gburd): SelectTransforming... apply fn here? result = (E) value.stream(); if (cacheHitCounter != null) { cacheHitCounter.inc(); } if (log != null) { log.info("cache hit"); } return result; } else { if (cacheMissCounter != null) { cacheMissCounter.inc(); } if (log != null) { log.info("cache miss"); } } } } if (cache != null) { Object obj = delegate.unwrap(result); if (obj != null) { cache.put(key, obj); } delegate.extract(result, key, cache); } } } // TODO: first, ask the delegate for the cacheKey // if this is a SELECT query: // if not in cache build the statement, execute the future, cache the result, transform the result then cache the transformations // if INSERT/UPSERT/UPDATE // if DELETE // if COUNT ---------------------------- @Override public CacheKey getCacheKey() { Listkeys = new ArrayList<>(filters.size()); HelenusEntity entity = props.get(0).getEntity(); for (HelenusPropertyNode prop : props) { switch(prop.getProperty().getColumnType()) { case PARTITION_KEY: case CLUSTERING_COLUMN: Filter filter = filters.get(prop.getProperty()); if (filter != null) { keys.add(filter.toString()); } else { // we're missing a part of the primary key, so we can't create a proper cache key return null; } break; default: // We've past the primary key components in this ordered list, so we're done building // the cache key. if (keys.size() > 0) { return new CacheKey(entity, Joiner.on(",").join(keys)); } return null; } } return null; } --------------------------- // TODO(gburd): create a statement that matches one that wasn't prepared //String key = // "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString(); //for (Object param : params) { // key = key.replaceFirst(Pattern.quote("?"), param.toString()); //} ------------------------ package net.helenus.core.operation; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Statement; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public abstract class AbstractCache { final Logger logger = LoggerFactory.getLogger(getClass()); public Cache cache; public AbstractCache() { RemovalListener listener = new RemovalListener() { @Override public void onRemoval(RemovalNotification n) { if (n.wasEvicted()) { String cause = n.getCause().name(); logger.info(cause); } } }; cache = CacheBuilder.newBuilder() .maximumSize(10_000) .expireAfterAccess(20, TimeUnit.MINUTES) .weakKeys() .softValues() .removalListener(listener) .build(); } V get(K key) { return cache.getIfPresent(key); } void put(K key, V value) { cache.put(key, value); } } ------------------------------------------------------------------------------------------------ cache entites (2 methods) marked @Cacheable cache entites in txn context cache results when .cache() chained before .{a}sync() call, return a EvictableCacheItem that has an .evict() method fix txn .andThen() chains primitive types have default values, (e.g. boolean, int, ...) but primative wrapper classes do not and can be null (e.g. Boolean, Integer, ...) create table wal { id timeuuid, follows timeuuid, read > write > primary key (id, follows) } begin: - insert into wal (timeuuid, parent timeuuid, // NOTE: Update operations have no meaning when they only contain primary key components, so // given that `properties` is ordered with the keys first if we find that the last element // is either a partition key or clustering column then we know we should just skip this operation. ColumnType ct = ((HelenusProperty) properties.toArray()[properties.size() - 1]).getColumnType(); if (ct != ColumnType.PARTITION_KEY && ct != ColumnType.CLUSTERING_COLUMN) { return; } public Stream sync() { ListenableFuture> future = async(); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(String contents) { //...process web site contents } @Override public void onFailure(Throwable throwable) { log.error("Exception in task", throwable); } }); } ------- private mergeCache(Map> private static Iterable concat( Iterable i1, Iterable i2) { return new Iterable() { public Iterator iterator() { return new Iterator() { Iterator listIterator = i1.iterator(); Boolean checkedHasNext; E nextValue; private boolean startTheSecond; void theNext() { if (listIterator.hasNext()) { checkedHasNext = true; nextValue = listIterator.next(); } else if (startTheSecond) checkedHasNext = false; else { startTheSecond = true; listIterator = i2.iterator(); theNext(); } } public boolean hasNext() { if (checkedHasNext == null) theNext(); return checkedHasNext; } public E next() { if (!hasNext()) throw new NoSuchElementException(); checkedHasNext = null; return nextValue; } public void remove() { listIterator.remove(); } }; } }; }