From 6b0daebb93bf7d69677b2f29c21b1008adabc8a9 Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Wed, 2 Aug 2017 12:08:29 -0400 Subject: [PATCH] WIP: working toward storing entity instances in a session-local cache. --- helenus-core.iml | 8 +- pom.xml | 15 +++ .../core/AbstractSessionOperations.java | 3 +- .../java/net/helenus/core/HelenusSession.java | 66 +++++++---- .../net/helenus/core/MetricsStatsCounter.java | 109 ++++++++++++++++++ .../net/helenus/core/SessionInitializer.java | 31 +++-- .../AbstractFilterStreamOperation.java | 3 +- .../core/operation/AbstractOperation.java | 1 + .../operation/AbstractStreamOperation.java | 15 ++- .../core/operation/SelectOperation.java | 10 +- .../core/cached/SessionCacheTest.java | 64 ++++++++++ .../test/integration/core/cached/Tree.java | 23 ++++ 12 files changed, 303 insertions(+), 45 deletions(-) create mode 100644 src/main/java/net/helenus/core/MetricsStatsCounter.java create mode 100644 src/test/java/net/helenus/test/integration/core/cached/SessionCacheTest.java create mode 100644 src/test/java/net/helenus/test/integration/core/cached/Tree.java diff --git a/helenus-core.iml b/helenus-core.iml index 77f2a38..241dc0e 100644 --- a/helenus-core.iml +++ b/helenus-core.iml @@ -11,8 +11,8 @@ - - + + @@ -20,7 +20,6 @@ - @@ -38,6 +37,8 @@ + + @@ -113,7 +114,6 @@ - diff --git a/pom.xml b/pom.xml index 89abb41..df6db78 100644 --- a/pom.xml +++ b/pom.xml @@ -145,6 +145,21 @@ 20.0 + + + com.github.ben-manes.caffeine + caffeine + 2.5.3 + + + + + + io.dropwizard.metrics + metrics-core + 3.2.3 + + javax.validation diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index 36a5145..ffd1957 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -129,8 +129,7 @@ public abstract class AbstractSessionOperations { } } - public void cache(String key, Object value) { - } + public abstract void cache(String key, Object value); RuntimeException translateException(RuntimeException e) { diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index 15cff2b..c84c3ef 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -15,20 +15,13 @@ */ package net.helenus.core; -import java.io.Closeable; -import java.io.PrintStream; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - +import com.codahale.metrics.MetricRegistry; import com.datastax.driver.core.*; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; - +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import net.helenus.core.operation.*; import net.helenus.core.reflect.HelenusPropertyNode; +import net.helenus.mapping.ColumnType; import net.helenus.mapping.HelenusEntity; import net.helenus.mapping.MappingUtil; import net.helenus.mapping.value.*; @@ -37,10 +30,15 @@ import net.helenus.support.Fun.Tuple1; import net.helenus.support.Fun.Tuple2; import net.helenus.support.Fun.Tuple6; -public final class HelenusSession extends AbstractSessionOperations implements Closeable { +import java.io.Closeable; +import java.io.PrintStream; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; - private final int MAX_CACHE_SIZE = 10000; - private final int MAX_CACHE_EXPIRE_SECONDS = 600; +public final class HelenusSession extends AbstractSessionOperations implements Closeable { private final Session session; private final CodecRegistry registry; @@ -54,12 +52,13 @@ public final class HelenusSession extends AbstractSessionOperations implements C private final RowColumnValueProvider valueProvider; private final StatementColumnValuePreparer valuePreparer; private final Metadata metadata; - private final Cache sessionCache; + private final MetricRegistry metricRegistry; + private final Cache sessionCache; private UnitOfWork currentUnitOfWork; HelenusSession(Session session, String usingKeyspace, CodecRegistry registry, boolean showCql, PrintStream printStream, SessionRepositoryBuilder sessionRepositoryBuilder, Executor executor, - boolean dropSchemaOnClose) { + boolean dropSchemaOnClose, MetricRegistry metricRegistry) { this.session = session; this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry; this.usingKeyspace = Objects.requireNonNull(usingKeyspace, @@ -73,9 +72,16 @@ public final class HelenusSession extends AbstractSessionOperations implements C this.valueProvider = new RowColumnValueProvider(this.sessionRepository); this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository); this.metadata = session.getCluster().getMetadata(); - this.sessionCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE) - .expireAfterAccess(MAX_CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).recordStats().build(); - this.currentUnitOfWork = null; + this.currentUnitOfWork = null; + + this.metricRegistry = metricRegistry; + Caffeine cacheBuilder = Caffeine.newBuilder() + .expireAfterWrite(5, TimeUnit.MINUTES) + .maximumSize(10_000); + if (this.metricRegistry != null) { + cacheBuilder.recordStats(() -> new MetricsStatsCounter(metricRegistry, "helenus-session-cache")); + } + sessionCache = cacheBuilder.build(); } @Override @@ -160,9 +166,13 @@ public final class HelenusSession extends AbstractSessionOperations implements C } public void cache(String key, Object value) { - sessionCache.put(key, value); // ttl + sessionCache.put(key, value); } + public Object fetch(String key) { + return sessionCache.getIfPresent(key); + } + public SelectOperation select(Class entityClass) { Objects.requireNonNull(entityClass, "entityClass is empty"); @@ -172,7 +182,21 @@ public final class HelenusSession extends AbstractSessionOperations implements C return new SelectOperation(this, entity, (r) -> { Map map = new ValueProviderMap(r, valueProvider, entity); - return (E) Helenus.map(entityClass, map); + E pojo = (E) Helenus.map(entityClass, map); + if (entity.isCacheable()) { + StringBuilder cacheKey = new StringBuilder(); + entity.getOrderedProperties().stream().forEach(property -> { + ColumnType ct = property.getColumnType(); + switch (ct) { + case PARTITION_KEY: + case CLUSTERING_COLUMN: + cacheKey.append(map.get(property.getPropertyName()).toString()); + break; + } + }); + cache(cacheKey.toString(), pojo); + } + return pojo; }); } diff --git a/src/main/java/net/helenus/core/MetricsStatsCounter.java b/src/main/java/net/helenus/core/MetricsStatsCounter.java new file mode 100644 index 0000000..927d2b5 --- /dev/null +++ b/src/main/java/net/helenus/core/MetricsStatsCounter.java @@ -0,0 +1,109 @@ +/* + * Copyright 2016 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.core; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.TimeUnit; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.github.benmanes.caffeine.cache.stats.StatsCounter; + +/** + * A {@link StatsCounter} instrumented with Dropwizard Metrics. + * + * @author ben.manes@gmail.com (Ben Manes) + */ +public final class MetricsStatsCounter implements StatsCounter { + private final Meter hitCount; + private final Meter missCount; + private final Meter loadSuccessCount; + private final Meter loadFailureCount; + private final Timer totalLoadTime; + private final Meter evictionCount; + private final Meter evictionWeight; + + /** + * Constructs an instance for use by a single cache. + * + * @param registry the registry of metric instances + * @param metricsPrefix the prefix name for the metrics + */ + public MetricsStatsCounter(MetricRegistry registry, String metricsPrefix) { + requireNonNull(metricsPrefix); + hitCount = registry.meter(metricsPrefix + ".hits"); + missCount = registry.meter(metricsPrefix + ".misses"); + totalLoadTime = registry.timer(metricsPrefix + ".loads"); + loadSuccessCount = registry.meter(metricsPrefix + ".loads-success"); + loadFailureCount = registry.meter(metricsPrefix + ".loads-failure"); + evictionCount = registry.meter(metricsPrefix + ".evictions"); + evictionWeight = registry.meter(metricsPrefix + ".evictions-weight"); + } + + @Override + public void recordHits(int count) { + hitCount.mark(count); + } + + @Override + public void recordMisses(int count) { + missCount.mark(count); + } + + @Override + public void recordLoadSuccess(long loadTime) { + loadSuccessCount.mark(); + totalLoadTime.update(loadTime, TimeUnit.NANOSECONDS); + } + + @Override + public void recordLoadFailure(long loadTime) { + loadFailureCount.mark(); + totalLoadTime.update(loadTime, TimeUnit.NANOSECONDS); + } + + @Override + public void recordEviction() { + // This method is scheduled for removal in version 3.0 in favor of recordEviction(weight) + recordEviction(1); + } + + @Override + public void recordEviction(int weight) { + evictionCount.mark(); + evictionWeight.mark(weight); + } + + @Override + public CacheStats snapshot() { + return new CacheStats( + hitCount.getCount(), + missCount.getCount(), + loadSuccessCount.getCount(), + loadFailureCount.getCount(), + totalLoadTime.getCount(), + evictionCount.getCount(), + evictionWeight.getCount()); + } + + @Override + public String toString() { + return snapshot().toString(); + } +} diff --git a/src/main/java/net/helenus/core/SessionInitializer.java b/src/main/java/net/helenus/core/SessionInitializer.java index 04470f5..4161087 100644 --- a/src/main/java/net/helenus/core/SessionInitializer.java +++ b/src/main/java/net/helenus/core/SessionInitializer.java @@ -15,19 +15,9 @@ */ package net.helenus.core; -import java.io.IOException; -import java.io.PrintStream; -import java.util.*; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.function.Consumer; -import java.util.stream.Collector; -import java.util.stream.Collectors; -import java.util.stream.Stream; - +import com.codahale.metrics.MetricRegistry; import com.datastax.driver.core.*; import com.google.common.util.concurrent.MoreExecutors; - import net.helenus.mapping.HelenusEntity; import net.helenus.mapping.HelenusEntityType; import net.helenus.mapping.value.ColumnValuePreparer; @@ -35,10 +25,18 @@ import net.helenus.mapping.value.ColumnValueProvider; import net.helenus.support.HelenusException; import net.helenus.support.PackageUtil; +import java.io.IOException; +import java.io.PrintStream; +import java.util.*; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + public final class SessionInitializer extends AbstractSessionOperations { private final Session session; private CodecRegistry registry; + private MetricRegistry metricRegistry; private String usingKeyspace; private boolean showCql = false; private PrintStream printStream = System.out; @@ -60,6 +58,9 @@ public final class SessionInitializer extends AbstractSessionOperations { this.sessionRepository = new SessionRepositoryBuilder(session); } + @Override + public void cache(String key, Object value) { } + @Override public Session currentSession() { return session; @@ -116,6 +117,12 @@ public final class SessionInitializer extends AbstractSessionOperations { return this; } + public SessionInitializer withMetricsRegistry(MetricRegistry metricRegistry) { + Objects.requireNonNull(metricRegistry, "empty registry"); + this.metricRegistry = metricRegistry; + return this; + } + public SessionInitializer withCachingExecutor() { this.executor = Executors.newCachedThreadPool(); return this; @@ -206,7 +213,7 @@ public final class SessionInitializer extends AbstractSessionOperations { public synchronized HelenusSession get() { initialize(); return new HelenusSession(session, usingKeyspace, registry, showCql, printStream, sessionRepository, executor, - autoDdl == AutoDdl.CREATE_DROP); + autoDdl == AutoDdl.CREATE_DROP, metricRegistry); } private void initialize() { diff --git a/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java index 65e390d..a5fbc27 100644 --- a/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java @@ -21,8 +21,7 @@ import java.util.List; import net.helenus.core.*; public abstract class AbstractFilterStreamOperation> - extends - AbstractStreamOperation { + extends AbstractStreamOperation { protected List> filters = null; protected List> ifFilters = null; diff --git a/src/main/java/net/helenus/core/operation/AbstractOperation.java b/src/main/java/net/helenus/core/operation/AbstractOperation.java index 64d7523..eb2728d 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOperation.java @@ -74,6 +74,7 @@ public abstract class AbstractOperation> ex sessionOps.cache(getCacheKey(), result); } return result; + } public ListenableFuture async() { diff --git a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java index dccb7ff..dff2936 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java @@ -21,6 +21,7 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.google.common.base.Function; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -64,10 +65,18 @@ public abstract class AbstractStreamOperation sync() { + ListenableFuture> future = async(); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(String contents) { + //...process web site contents + } - ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); - - return transform(resultSet); + @Override + public void onFailure(Throwable throwable) { + log.error("Exception in task", throwable); + } + }); } public ListenableFuture> async() { diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index 4df5583..613b4b4 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -221,7 +221,15 @@ public final class SelectOperation extends AbstractFilterStreamOperation sync() { + if (true) { + } else { + return super.sync(); + } + } + + @SuppressWarnings("unchecked") @Override public Stream transform(ResultSet resultSet) { diff --git a/src/test/java/net/helenus/test/integration/core/cached/SessionCacheTest.java b/src/test/java/net/helenus/test/integration/core/cached/SessionCacheTest.java new file mode 100644 index 0000000..857503d --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/cached/SessionCacheTest.java @@ -0,0 +1,64 @@ +package net.helenus.test.integration.core.cached; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.utils.UUIDs; +import net.helenus.core.Helenus; +import net.helenus.core.HelenusSession; +import net.helenus.core.operation.PreparedOperation; +import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest; +import net.helenus.test.integration.core.simple.User; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.*; + +import static net.helenus.core.Query.eq; +import static net.helenus.core.Query.marker; + +public class SessionCacheTest extends AbstractEmbeddedCassandraTest { + + static HelenusSession session; + static Tree tree; + + static Random rnd = new Random(); + static Set keys = new HashSet(); + + @BeforeClass + public static void beforeTest() { + session = Helenus.init(getSession()).showCql().add(Tree.class).autoCreateDrop().showCql().get(); + tree = Helenus.dsl(Tree.class, session.getMetadata()); + + PreparedOperation insertOp = + session.insert() + .value(tree::id, marker()) + .value(tree::name, marker()) + .value(tree::age, marker()) + .value(tree::height, marker()) + .prepare(); + + for (int i = 0; i < 10; i++) { + UUID key = UUIDs.random(); + keys.add(key); + insertOp.bind(key, + UUIDs.random().toString(), + rnd.nextInt(1000), + rnd.nextInt(5000)) + .sync(); + } + Assert.assertEquals(session.count(tree).sync().intValue(), 10); + } + + @Test + public void cacheThingsTest() throws Exception { + keys.forEach(key -> { + Optional aTree = session.select(Tree.class) + .where(tree::id, eq(key)) + .single() + .sync(); + Assert.assertTrue(aTree.isPresent()); + Assert.assertEquals(session.fetch(key.toString()), aTree.get()); + }); + } + +} diff --git a/src/test/java/net/helenus/test/integration/core/cached/Tree.java b/src/test/java/net/helenus/test/integration/core/cached/Tree.java new file mode 100644 index 0000000..23d661a --- /dev/null +++ b/src/test/java/net/helenus/test/integration/core/cached/Tree.java @@ -0,0 +1,23 @@ +package net.helenus.test.integration.core.cached; + +import net.helenus.core.annotation.Cacheable; +import net.helenus.mapping.annotation.Column; +import net.helenus.mapping.annotation.PartitionKey; +import net.helenus.mapping.annotation.Table; + +import java.util.UUID; + +@Table +@Cacheable +public interface Tree { + + @PartitionKey + UUID id(); + + String name(); + + Integer age(); + + Integer height(); + +}