WIP: working toward storing entity instances in a session-local cache.

This commit is contained in:
Greg Burd 2017-08-02 12:08:29 -04:00
parent 18cfc85f45
commit 6b0daebb93
12 changed files with 303 additions and 45 deletions

View file

@ -11,8 +11,8 @@
</content> </content>
<orderEntry type="inheritedJdk" /> <orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-core:3.2.2" level="project" /> <orderEntry type="library" name="Maven: com.github.ben-manes.caffeine:caffeine:2.5.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.github.ben-manes.caffeine:caffeine:2.2.6" level="project" /> <orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-core:3.2.3" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-library:2.13.0-M1" level="project" /> <orderEntry type="library" name="Maven: org.scala-lang:scala-library:2.13.0-M1" level="project" />
<orderEntry type="library" name="Maven: com.datastax.cassandra:cassandra-driver-core:3.3.0" level="project" /> <orderEntry type="library" name="Maven: com.datastax.cassandra:cassandra-driver-core:3.3.0" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty-handler:4.0.47.Final" level="project" /> <orderEntry type="library" name="Maven: io.netty:netty-handler:4.0.47.Final" level="project" />
@ -20,7 +20,6 @@
<orderEntry type="library" name="Maven: io.netty:netty-common:4.0.47.Final" level="project" /> <orderEntry type="library" name="Maven: io.netty:netty-common:4.0.47.Final" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty-transport:4.0.47.Final" level="project" /> <orderEntry type="library" name="Maven: io.netty:netty-transport:4.0.47.Final" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty-codec:4.0.47.Final" level="project" /> <orderEntry type="library" name="Maven: io.netty:netty-codec:4.0.47.Final" level="project" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-core:3.2.2" level="project" />
<orderEntry type="library" name="Maven: com.github.jnr:jnr-ffi:2.0.7" level="project" /> <orderEntry type="library" name="Maven: com.github.jnr:jnr-ffi:2.0.7" level="project" />
<orderEntry type="library" name="Maven: com.github.jnr:jffi:1.2.10" level="project" /> <orderEntry type="library" name="Maven: com.github.jnr:jffi:1.2.10" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: com.github.jnr:jffi:native:1.2.10" level="project" /> <orderEntry type="library" scope="RUNTIME" name="Maven: com.github.jnr:jffi:native:1.2.10" level="project" />
@ -38,6 +37,8 @@
<orderEntry type="library" name="Maven: org.springframework:spring-core:4.3.10.RELEASE" level="project" /> <orderEntry type="library" name="Maven: org.springframework:spring-core:4.3.10.RELEASE" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" /> <orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:20.0" level="project" /> <orderEntry type="library" name="Maven: com.google.guava:guava:20.0" level="project" />
<orderEntry type="library" name="Maven: com.github.ben-manes.caffeine:caffeine:2.5.3" level="project" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-core:3.2.3" level="project" />
<orderEntry type="library" name="Maven: javax.validation:validation-api:2.0.0.CR3" level="project" /> <orderEntry type="library" name="Maven: javax.validation:validation-api:2.0.0.CR3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.13" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.13" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.anthemengineering.mojo:infer-maven-plugin:0.1.0" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: com.anthemengineering.mojo:infer-maven-plugin:0.1.0" level="project" />
@ -113,7 +114,6 @@
<orderEntry type="library" scope="TEST" name="Maven: org.fusesource:sigar:1.6.4" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: org.fusesource:sigar:1.6.4" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.eclipse.jdt.core.compiler:ecj:4.4.2" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: org.eclipse.jdt.core.compiler:ecj:4.4.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.caffinitas.ohc:ohc-core:0.4.4" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: org.caffinitas.ohc:ohc-core:0.4.4" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.github.ben-manes.caffeine:caffeine:2.2.6" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.jctools:jctools-core:1.2.1" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: org.jctools:jctools-core:1.2.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: commons-io:commons-io:2.5" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: commons-io:commons-io:2.5" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" /> <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" />

15
pom.xml
View file

@ -145,6 +145,21 @@
<version>20.0</version> <version>20.0</version>
</dependency> </dependency>
<!-- Caching -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.5.3</version>
</dependency>
<!-- Metrics -->
<!-- https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.2.3</version>
</dependency>
<!-- Validation --> <!-- Validation -->
<dependency> <dependency>
<groupId>javax.validation</groupId> <groupId>javax.validation</groupId>

View file

@ -129,8 +129,7 @@ public abstract class AbstractSessionOperations {
} }
} }
public void cache(String key, Object value) { public abstract void cache(String key, Object value);
}
RuntimeException translateException(RuntimeException e) { RuntimeException translateException(RuntimeException e) {

View file

@ -15,20 +15,13 @@
*/ */
package net.helenus.core; package net.helenus.core;
import java.io.Closeable; import com.codahale.metrics.MetricRegistry;
import java.io.PrintStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import com.datastax.driver.core.*; import com.datastax.driver.core.*;
import com.google.common.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.github.benmanes.caffeine.cache.Caffeine;
import net.helenus.core.operation.*; import net.helenus.core.operation.*;
import net.helenus.core.reflect.HelenusPropertyNode; import net.helenus.core.reflect.HelenusPropertyNode;
import net.helenus.mapping.ColumnType;
import net.helenus.mapping.HelenusEntity; import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.MappingUtil; import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.value.*; import net.helenus.mapping.value.*;
@ -37,10 +30,15 @@ import net.helenus.support.Fun.Tuple1;
import net.helenus.support.Fun.Tuple2; import net.helenus.support.Fun.Tuple2;
import net.helenus.support.Fun.Tuple6; import net.helenus.support.Fun.Tuple6;
public final class HelenusSession extends AbstractSessionOperations implements Closeable { import java.io.Closeable;
import java.io.PrintStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
private final int MAX_CACHE_SIZE = 10000; public final class HelenusSession extends AbstractSessionOperations implements Closeable {
private final int MAX_CACHE_EXPIRE_SECONDS = 600;
private final Session session; private final Session session;
private final CodecRegistry registry; private final CodecRegistry registry;
@ -54,12 +52,13 @@ public final class HelenusSession extends AbstractSessionOperations implements C
private final RowColumnValueProvider valueProvider; private final RowColumnValueProvider valueProvider;
private final StatementColumnValuePreparer valuePreparer; private final StatementColumnValuePreparer valuePreparer;
private final Metadata metadata; private final Metadata metadata;
private final Cache<String, Object> sessionCache; private final MetricRegistry metricRegistry;
private final Cache<String, Object> sessionCache;
private UnitOfWork currentUnitOfWork; private UnitOfWork currentUnitOfWork;
HelenusSession(Session session, String usingKeyspace, CodecRegistry registry, boolean showCql, HelenusSession(Session session, String usingKeyspace, CodecRegistry registry, boolean showCql,
PrintStream printStream, SessionRepositoryBuilder sessionRepositoryBuilder, Executor executor, PrintStream printStream, SessionRepositoryBuilder sessionRepositoryBuilder, Executor executor,
boolean dropSchemaOnClose) { boolean dropSchemaOnClose, MetricRegistry metricRegistry) {
this.session = session; this.session = session;
this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry; this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry;
this.usingKeyspace = Objects.requireNonNull(usingKeyspace, this.usingKeyspace = Objects.requireNonNull(usingKeyspace,
@ -73,9 +72,16 @@ public final class HelenusSession extends AbstractSessionOperations implements C
this.valueProvider = new RowColumnValueProvider(this.sessionRepository); this.valueProvider = new RowColumnValueProvider(this.sessionRepository);
this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository); this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository);
this.metadata = session.getCluster().getMetadata(); this.metadata = session.getCluster().getMetadata();
this.sessionCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE) this.currentUnitOfWork = null;
.expireAfterAccess(MAX_CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS).recordStats().build();
this.currentUnitOfWork = null; this.metricRegistry = metricRegistry;
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.maximumSize(10_000);
if (this.metricRegistry != null) {
cacheBuilder.recordStats(() -> new MetricsStatsCounter(metricRegistry, "helenus-session-cache"));
}
sessionCache = cacheBuilder.build();
} }
@Override @Override
@ -160,9 +166,13 @@ public final class HelenusSession extends AbstractSessionOperations implements C
} }
public void cache(String key, Object value) { public void cache(String key, Object value) {
sessionCache.put(key, value); // ttl sessionCache.put(key, value);
} }
public Object fetch(String key) {
return sessionCache.getIfPresent(key);
}
public <E> SelectOperation<E> select(Class<E> entityClass) { public <E> SelectOperation<E> select(Class<E> entityClass) {
Objects.requireNonNull(entityClass, "entityClass is empty"); Objects.requireNonNull(entityClass, "entityClass is empty");
@ -172,7 +182,21 @@ public final class HelenusSession extends AbstractSessionOperations implements C
return new SelectOperation<E>(this, entity, (r) -> { return new SelectOperation<E>(this, entity, (r) -> {
Map<String, Object> map = new ValueProviderMap(r, valueProvider, entity); Map<String, Object> map = new ValueProviderMap(r, valueProvider, entity);
return (E) Helenus.map(entityClass, map); E pojo = (E) Helenus.map(entityClass, map);
if (entity.isCacheable()) {
StringBuilder cacheKey = new StringBuilder();
entity.getOrderedProperties().stream().forEach(property -> {
ColumnType ct = property.getColumnType();
switch (ct) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
cacheKey.append(map.get(property.getPropertyName()).toString());
break;
}
});
cache(cacheKey.toString(), pojo);
}
return pojo;
}); });
} }

View file

@ -0,0 +1,109 @@
/*
* Copyright 2016 Ben Manes. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core;
import static java.util.Objects.requireNonNull;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.github.benmanes.caffeine.cache.stats.StatsCounter;
/**
* A {@link StatsCounter} instrumented with Dropwizard Metrics.
*
* @author ben.manes@gmail.com (Ben Manes)
*/
public final class MetricsStatsCounter implements StatsCounter {
private final Meter hitCount;
private final Meter missCount;
private final Meter loadSuccessCount;
private final Meter loadFailureCount;
private final Timer totalLoadTime;
private final Meter evictionCount;
private final Meter evictionWeight;
/**
* Constructs an instance for use by a single cache.
*
* @param registry the registry of metric instances
* @param metricsPrefix the prefix name for the metrics
*/
public MetricsStatsCounter(MetricRegistry registry, String metricsPrefix) {
requireNonNull(metricsPrefix);
hitCount = registry.meter(metricsPrefix + ".hits");
missCount = registry.meter(metricsPrefix + ".misses");
totalLoadTime = registry.timer(metricsPrefix + ".loads");
loadSuccessCount = registry.meter(metricsPrefix + ".loads-success");
loadFailureCount = registry.meter(metricsPrefix + ".loads-failure");
evictionCount = registry.meter(metricsPrefix + ".evictions");
evictionWeight = registry.meter(metricsPrefix + ".evictions-weight");
}
@Override
public void recordHits(int count) {
hitCount.mark(count);
}
@Override
public void recordMisses(int count) {
missCount.mark(count);
}
@Override
public void recordLoadSuccess(long loadTime) {
loadSuccessCount.mark();
totalLoadTime.update(loadTime, TimeUnit.NANOSECONDS);
}
@Override
public void recordLoadFailure(long loadTime) {
loadFailureCount.mark();
totalLoadTime.update(loadTime, TimeUnit.NANOSECONDS);
}
@Override
public void recordEviction() {
// This method is scheduled for removal in version 3.0 in favor of recordEviction(weight)
recordEviction(1);
}
@Override
public void recordEviction(int weight) {
evictionCount.mark();
evictionWeight.mark(weight);
}
@Override
public CacheStats snapshot() {
return new CacheStats(
hitCount.getCount(),
missCount.getCount(),
loadSuccessCount.getCount(),
loadFailureCount.getCount(),
totalLoadTime.getCount(),
evictionCount.getCount(),
evictionWeight.getCount());
}
@Override
public String toString() {
return snapshot().toString();
}
}

View file

@ -15,19 +15,9 @@
*/ */
package net.helenus.core; package net.helenus.core;
import java.io.IOException; import com.codahale.metrics.MetricRegistry;
import java.io.PrintStream;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.datastax.driver.core.*; import com.datastax.driver.core.*;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import net.helenus.mapping.HelenusEntity; import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusEntityType; import net.helenus.mapping.HelenusEntityType;
import net.helenus.mapping.value.ColumnValuePreparer; import net.helenus.mapping.value.ColumnValuePreparer;
@ -35,10 +25,18 @@ import net.helenus.mapping.value.ColumnValueProvider;
import net.helenus.support.HelenusException; import net.helenus.support.HelenusException;
import net.helenus.support.PackageUtil; import net.helenus.support.PackageUtil;
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
public final class SessionInitializer extends AbstractSessionOperations { public final class SessionInitializer extends AbstractSessionOperations {
private final Session session; private final Session session;
private CodecRegistry registry; private CodecRegistry registry;
private MetricRegistry metricRegistry;
private String usingKeyspace; private String usingKeyspace;
private boolean showCql = false; private boolean showCql = false;
private PrintStream printStream = System.out; private PrintStream printStream = System.out;
@ -60,6 +58,9 @@ public final class SessionInitializer extends AbstractSessionOperations {
this.sessionRepository = new SessionRepositoryBuilder(session); this.sessionRepository = new SessionRepositoryBuilder(session);
} }
@Override
public void cache(String key, Object value) { }
@Override @Override
public Session currentSession() { public Session currentSession() {
return session; return session;
@ -116,6 +117,12 @@ public final class SessionInitializer extends AbstractSessionOperations {
return this; return this;
} }
public SessionInitializer withMetricsRegistry(MetricRegistry metricRegistry) {
Objects.requireNonNull(metricRegistry, "empty registry");
this.metricRegistry = metricRegistry;
return this;
}
public SessionInitializer withCachingExecutor() { public SessionInitializer withCachingExecutor() {
this.executor = Executors.newCachedThreadPool(); this.executor = Executors.newCachedThreadPool();
return this; return this;
@ -206,7 +213,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
public synchronized HelenusSession get() { public synchronized HelenusSession get() {
initialize(); initialize();
return new HelenusSession(session, usingKeyspace, registry, showCql, printStream, sessionRepository, executor, return new HelenusSession(session, usingKeyspace, registry, showCql, printStream, sessionRepository, executor,
autoDdl == AutoDdl.CREATE_DROP); autoDdl == AutoDdl.CREATE_DROP, metricRegistry);
} }
private void initialize() { private void initialize() {

View file

@ -21,8 +21,7 @@ import java.util.List;
import net.helenus.core.*; import net.helenus.core.*;
public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterStreamOperation<E, O>> public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterStreamOperation<E, O>>
extends extends AbstractStreamOperation<E, O> {
AbstractStreamOperation<E, O> {
protected List<Filter<?>> filters = null; protected List<Filter<?>> filters = null;
protected List<Filter<?>> ifFilters = null; protected List<Filter<?>> ifFilters = null;

View file

@ -74,6 +74,7 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
sessionOps.cache(getCacheKey(), result); sessionOps.cache(getCacheKey(), result);
} }
return result; return result;
} }
public ListenableFuture<E> async() { public ListenableFuture<E> async() {

View file

@ -21,6 +21,7 @@ import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.ResultSetFuture;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -64,10 +65,18 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
} }
public Stream<E> sync() { public Stream<E> sync() {
ListenableFuture<Stream<E>> future = async();
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String contents) {
//...process web site contents
}
ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); @Override
public void onFailure(Throwable throwable) {
return transform(resultSet); log.error("Exception in task", throwable);
}
});
} }
public ListenableFuture<Stream<E>> async() { public ListenableFuture<Stream<E>> async() {

View file

@ -221,7 +221,15 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
return select; return select;
} }
@SuppressWarnings("unchecked") @Override
public Optional<E> sync() {
if (true) {
} else {
return super.sync();
}
}
@SuppressWarnings("unchecked")
@Override @Override
public Stream<E> transform(ResultSet resultSet) { public Stream<E> transform(ResultSet resultSet) {

View file

@ -0,0 +1,64 @@
package net.helenus.test.integration.core.cached;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.utils.UUIDs;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.operation.PreparedOperation;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
import net.helenus.test.integration.core.simple.User;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.*;
import static net.helenus.core.Query.eq;
import static net.helenus.core.Query.marker;
public class SessionCacheTest extends AbstractEmbeddedCassandraTest {
static HelenusSession session;
static Tree tree;
static Random rnd = new Random();
static Set<UUID> keys = new HashSet<UUID>();
@BeforeClass
public static void beforeTest() {
session = Helenus.init(getSession()).showCql().add(Tree.class).autoCreateDrop().showCql().get();
tree = Helenus.dsl(Tree.class, session.getMetadata());
PreparedOperation<ResultSet> insertOp =
session.insert()
.value(tree::id, marker())
.value(tree::name, marker())
.value(tree::age, marker())
.value(tree::height, marker())
.prepare();
for (int i = 0; i < 10; i++) {
UUID key = UUIDs.random();
keys.add(key);
insertOp.bind(key,
UUIDs.random().toString(),
rnd.nextInt(1000),
rnd.nextInt(5000))
.sync();
}
Assert.assertEquals(session.count(tree).sync().intValue(), 10);
}
@Test
public void cacheThingsTest() throws Exception {
keys.forEach(key -> {
Optional<Tree> aTree = session.select(Tree.class)
.where(tree::id, eq(key))
.single()
.sync();
Assert.assertTrue(aTree.isPresent());
Assert.assertEquals(session.fetch(key.toString()), aTree.get());
});
}
}

View file

@ -0,0 +1,23 @@
package net.helenus.test.integration.core.cached;
import net.helenus.core.annotation.Cacheable;
import net.helenus.mapping.annotation.Column;
import net.helenus.mapping.annotation.PartitionKey;
import net.helenus.mapping.annotation.Table;
import java.util.UUID;
@Table
@Cacheable
public interface Tree {
@PartitionKey
UUID id();
String name();
Integer age();
Integer height();
}