Add notion of statement cache to UnitOfWork. Ignore call to useKeyspace when session isn't valid.

This commit is contained in:
Greg Burd 2018-01-17 12:38:33 -05:00
parent 1da822ce57
commit 26f41dab75
3 changed files with 41 additions and 3 deletions

View file

@ -24,6 +24,7 @@ import com.google.common.collect.TreeTraverser;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -48,6 +49,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
private final HelenusSession session; private final HelenusSession session;
private final AbstractUnitOfWork<E> parent; private final AbstractUnitOfWork<E> parent;
private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create(); private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create();
private final Map<String, Object> statementCache = new ConcurrentHashMap<String, Object>();
protected String purpose; protected String purpose;
protected List<String> nestedPurposes = new ArrayList<String>(); protected List<String> nestedPurposes = new ArrayList<String>();
protected String info; protected String info;
@ -203,10 +205,23 @@ public abstract class AbstractUnitOfWork<E extends Exception>
} }
} }
@Override
public Optional<Object> cacheLookup(String key) {
AbstractUnitOfWork self = this;
do {
Object result = self.statementCache.get(key);
if (result != null) {
return result == deleted ? Optional.ofNullable(null) : Optional.of(result);
}
self = self.parent;
} while (self != null);
return Optional.empty();
}
@Override @Override
public Optional<Object> cacheLookup(List<Facet> facets) { public Optional<Object> cacheLookup(List<Facet> facets) {
String tableName = CacheUtil.schemaName(facets); String tableName = CacheUtil.schemaName(facets);
Optional<Object> result = Optional.empty(); Optional<Object> result = Optional.empty();
for (Facet facet : facets) { for (Facet facet : facets) {
if (!facet.fixed()) { if (!facet.fixed()) {
String columnName = facet.name() + "==" + facet.value(); String columnName = facet.name() + "==" + facet.value();
@ -243,6 +258,16 @@ public abstract class AbstractUnitOfWork<E extends Exception>
return result; return result;
} }
@Override
public void cacheEvict(String key) {
statementCache.remove(key);
}
@Override
public void cacheDelete(String key) {
statementCache.replace(key, deleted);
}
@Override @Override
public List<Facet> cacheEvict(List<Facet> facets) { public List<Facet> cacheEvict(List<Facet> facets) {
Either<Object, List<Facet>> deletedObjectFacets = Either.right(facets); Either<Object, List<Facet>> deletedObjectFacets = Either.right(facets);
@ -279,6 +304,11 @@ public abstract class AbstractUnitOfWork<E extends Exception>
return facets; return facets;
} }
@Override
public Object cacheUpdate(String key, Object value) {
return statementCache.replace(key, value);
}
@Override @Override
public Object cacheUpdate(Object value, List<Facet> facets) { public Object cacheUpdate(Object value, List<Facet> facets) {
Object result = null; Object result = null;
@ -378,6 +408,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
} else { } else {
// Merge cache and statistics into parent if there is one. // Merge cache and statistics into parent if there is one.
parent.statementCache.putAll(statementCache);
parent.mergeCache(cache); parent.mergeCache(cache);
parent.addBatched(batch); parent.addBatched(batch);
if (purpose != null) { if (purpose != null) {

View file

@ -273,8 +273,10 @@ public final class SessionInitializer extends AbstractSessionOperations {
} }
public SessionInitializer use(String keyspace) { public SessionInitializer use(String keyspace) {
session.execute(SchemaUtil.use(keyspace, false)); if (session != null) {
this.usingKeyspace = keyspace; session.execute(SchemaUtil.use(keyspace, false));
this.usingKeyspace = keyspace;
}
return this; return this;
} }

View file

@ -61,12 +61,17 @@ public interface UnitOfWork<X extends Exception> extends AutoCloseable {
void addFuture(CompletableFuture<?> future); void addFuture(CompletableFuture<?> future);
Optional<Object> cacheLookup(String key);
Optional<Object> cacheLookup(List<Facet> facets); Optional<Object> cacheLookup(List<Facet> facets);
Object cacheUpdate(String key, Object value);
Object cacheUpdate(Object pojo, List<Facet> facets); Object cacheUpdate(Object pojo, List<Facet> facets);
void cacheEvict(String key);
List<Facet> cacheEvict(List<Facet> facets); List<Facet> cacheEvict(List<Facet> facets);
public void cacheDelete(String key);
String getPurpose(); String getPurpose();
UnitOfWork setPurpose(String purpose); UnitOfWork setPurpose(String purpose);