Improved support for batched statements.
This commit is contained in:
parent
a198989a76
commit
5570a97dff
14 changed files with 228 additions and 88 deletions
|
@ -18,6 +18,7 @@ package net.helenus.core;
|
|||
import static net.helenus.core.HelenusSession.deleted;
|
||||
|
||||
import com.datastax.driver.core.BatchStatement;
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.diffplug.common.base.Errors;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.HashBasedTable;
|
||||
|
@ -25,10 +26,12 @@ import com.google.common.collect.Table;
|
|||
import com.google.common.collect.TreeTraverser;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Collectors;
|
||||
import net.helenus.core.cache.CacheUtil;
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.core.operation.AbstractOperation;
|
||||
import net.helenus.core.operation.BatchOperation;
|
||||
import net.helenus.support.Either;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -56,7 +59,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
|
|||
private boolean aborted = false;
|
||||
private boolean committed = false;
|
||||
private long committedAt = 0L;
|
||||
private List<AbstractOperation<?, ?>> operations = new ArrayList<AbstractOperation<?, ?>>();
|
||||
private BatchOperation batch;
|
||||
|
||||
protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork<E> parent) {
|
||||
Objects.requireNonNull(session, "containing session cannot be null");
|
||||
|
@ -269,7 +272,10 @@ public abstract class AbstractUnitOfWork<E extends Exception>
|
|||
}
|
||||
|
||||
public void batch(AbstractOperation s) {
|
||||
operations.add(s);
|
||||
if (batch == null) {
|
||||
batch = new BatchOperation(session);
|
||||
}
|
||||
batch.add(s);
|
||||
}
|
||||
|
||||
private Iterator<AbstractUnitOfWork<E>> getChildNodes() {
|
||||
|
@ -282,17 +288,11 @@ public abstract class AbstractUnitOfWork<E extends Exception>
|
|||
* @return a function from which to chain work that only happens when commit is successful
|
||||
* @throws E when the work overlaps with other concurrent writers.
|
||||
*/
|
||||
public PostCommitFunction<Void, Void> commit() throws E {
|
||||
public PostCommitFunction<Void, Void> commit() throws E, TimeoutException {
|
||||
|
||||
if (operations != null && operations.size() > 0) {
|
||||
if (parent == null) {
|
||||
BatchStatement batch = new BatchStatement();
|
||||
batch.addAll(operations.stream().map(o -> o.buildStatement(false)).collect(Collectors.toList()));
|
||||
batch.setConsistencyLevel(session.getDefaultConsistencyLevel());
|
||||
session.getSession().execute(batch);
|
||||
} else {
|
||||
parent.operations.addAll(operations);
|
||||
}
|
||||
if (batch != null) {
|
||||
committedAt = batch.sync(this);
|
||||
//TODO(gburd) update cache with writeTime...
|
||||
}
|
||||
|
||||
// All nested UnitOfWork should be committed (not aborted) before calls to
|
||||
|
@ -337,6 +337,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
|
|||
|
||||
// Merge cache and statistics into parent if there is one.
|
||||
parent.mergeCache(cache);
|
||||
parent.addBatched(batch);
|
||||
if (purpose != null) {
|
||||
parent.nestedPurposes.add(purpose);
|
||||
}
|
||||
|
@ -362,7 +363,15 @@ public abstract class AbstractUnitOfWork<E extends Exception>
|
|||
return new PostCommitFunction(this, postCommit);
|
||||
}
|
||||
|
||||
/* Explicitly discard the work and mark it as as such in the log. */
|
||||
private void addBatched(BatchOperation batch) {
|
||||
if (this.batch == null) {
|
||||
this.batch = batch;
|
||||
} else {
|
||||
this.batch.addAll(batch);
|
||||
}
|
||||
}
|
||||
|
||||
/* Explicitly discard the work and mark it as as such in the log. */
|
||||
public synchronized void abort() {
|
||||
TreeTraverser<AbstractUnitOfWork<E>> traverser =
|
||||
TreeTraverser.using(node -> node::getChildNodes);
|
||||
|
|
|
@ -19,6 +19,8 @@ import com.datastax.driver.core.Statement;
|
|||
import com.google.common.base.Stopwatch;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.core.operation.AbstractOperation;
|
||||
|
||||
|
@ -40,7 +42,7 @@ public interface UnitOfWork<X extends Exception> extends AutoCloseable {
|
|||
* @return a function from which to chain work that only happens when commit is successful
|
||||
* @throws X when the work overlaps with other concurrent writers.
|
||||
*/
|
||||
PostCommitFunction<Void, Void> commit() throws X;
|
||||
PostCommitFunction<Void, Void> commit() throws X, TimeoutException;
|
||||
|
||||
/**
|
||||
* Explicitly abort the work within this unit of work. Any nested aborted unit of work will
|
||||
|
|
|
@ -81,6 +81,13 @@ public class CacheUtil {
|
|||
return combinations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge changed values in the map behind `from` into `to`.
|
||||
*
|
||||
* @param to
|
||||
* @param from
|
||||
* @return
|
||||
*/
|
||||
public static Object merge(Object to, Object from) {
|
||||
if (to == from) {
|
||||
return to;
|
||||
|
@ -112,7 +119,5 @@ public class CacheUtil {
|
|||
return "_" + propertyName + "_writeTime";
|
||||
}
|
||||
|
||||
public static String ttlKey(String propertyName) {
|
||||
return "_" + propertyName + "_ttl";
|
||||
}
|
||||
public static String ttlKey(String propertyName) { return "_" + propertyName + "_ttl"; }
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import net.helenus.core.AbstractSessionOperations;
|
|||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.core.cache.CacheUtil;
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.support.Fun;
|
||||
|
||||
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
|
||||
extends AbstractStatementOperation<E, O> {
|
||||
|
@ -98,9 +99,12 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
}
|
||||
|
||||
if (updateCache && result.isPresent()) {
|
||||
List<Facet> facets = getFacets();
|
||||
if (facets != null && facets.size() > 1) {
|
||||
sessionOps.updateCache(result.get(), facets);
|
||||
E r = result.get();
|
||||
if (!(r instanceof Fun)) {
|
||||
List<Facet> facets = getFacets();
|
||||
if (facets != null && facets.size() > 1) {
|
||||
sessionOps.updateCache(r, facets);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
@ -186,8 +190,11 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
|
||||
// If we have a result, it wasn't from the UOW cache, and we're caching things
|
||||
// then we need to put this result into the cache for future requests to find.
|
||||
if (updateCache && result.isPresent() && result.get() != deleted) {
|
||||
cacheUpdate(uow, result.get(), getFacets());
|
||||
if (updateCache && result.isPresent()) {
|
||||
E r = result.get();
|
||||
if (!(r instanceof Fun) && r != deleted) {
|
||||
cacheUpdate(uow, r, getFacets());
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
|
@ -43,11 +43,6 @@ import net.helenus.support.HelenusException;
|
|||
|
||||
public abstract class AbstractStatementOperation<E, O extends AbstractStatementOperation<E, O>>
|
||||
extends Operation<E> {
|
||||
|
||||
protected boolean showValues = true;
|
||||
protected TraceContext traceContext;
|
||||
long queryExecutionTimeout = 10;
|
||||
TimeUnit queryTimeoutUnits = TimeUnit.SECONDS;
|
||||
private boolean ignoreCache = false;
|
||||
private ConsistencyLevel consistencyLevel;
|
||||
private ConsistencyLevel serialConsistencyLevel;
|
||||
|
|
|
@ -34,6 +34,7 @@ import net.helenus.core.AbstractSessionOperations;
|
|||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.core.cache.CacheUtil;
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.support.Fun;
|
||||
|
||||
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
|
||||
extends AbstractStatementOperation<E, O> {
|
||||
|
@ -104,7 +105,9 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
List<E> again = new ArrayList<>();
|
||||
resultStream.forEach(
|
||||
result -> {
|
||||
sessionOps.updateCache(result, facets);
|
||||
if (!(result instanceof Fun)) {
|
||||
sessionOps.updateCache(result, facets);
|
||||
}
|
||||
again.add(result);
|
||||
});
|
||||
resultStream = again.stream();
|
||||
|
@ -184,18 +187,18 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
// If we have a result and we're caching then we need to put it into the cache
|
||||
// for future requests to find.
|
||||
if (resultStream != null) {
|
||||
List<E> again = new ArrayList<>();
|
||||
List<Facet> facets = getFacets();
|
||||
resultStream.forEach(
|
||||
result -> {
|
||||
if (result != deleted) {
|
||||
if (updateCache) {
|
||||
cacheUpdate(uow, result, facets);
|
||||
}
|
||||
again.add(result);
|
||||
}
|
||||
});
|
||||
resultStream = again.stream();
|
||||
if (updateCache) {
|
||||
List<E> again = new ArrayList<>();
|
||||
List<Facet> facets = getFacets();
|
||||
resultStream.forEach(
|
||||
result -> {
|
||||
if (result != deleted && !(result instanceof Fun)) {
|
||||
cacheUpdate(uow, result, facets);
|
||||
}
|
||||
again.add(result);
|
||||
});
|
||||
resultStream = again.stream();
|
||||
}
|
||||
}
|
||||
|
||||
return resultStream;
|
||||
|
|
108
src/main/java/net/helenus/core/operation/BatchOperation.java
Normal file
108
src/main/java/net/helenus/core/operation/BatchOperation.java
Normal file
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Copyright (C) 2015 The Helenus Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.datastax.driver.core.BatchStatement;
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.querybuilder.BuiltStatement;
|
||||
import net.helenus.core.AbstractSessionOperations;
|
||||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.support.HelenusException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class BatchOperation extends Operation<Long> {
|
||||
private BatchStatement batch = null;
|
||||
private List<AbstractOperation<?, ?>> operations = new ArrayList<AbstractOperation<?, ?>>();
|
||||
private boolean logged = false;
|
||||
private long timestamp = 0L;
|
||||
|
||||
public BatchOperation(AbstractSessionOperations sessionOperations) {
|
||||
super(sessionOperations);
|
||||
}
|
||||
|
||||
public void add(AbstractOperation<?, ?> operation) {
|
||||
operations.add(operation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BatchStatement buildStatement(boolean cached) {
|
||||
batch = new BatchStatement();
|
||||
batch.addAll(operations.stream().map(o -> o.buildStatement(cached)).collect(Collectors.toList()));
|
||||
batch.setConsistencyLevel(sessionOps.getDefaultConsistencyLevel());
|
||||
timestamp = batch.getDefaultTimestamp();
|
||||
return batch;
|
||||
}
|
||||
|
||||
public BatchOperation logged() {
|
||||
logged = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BatchOperation setLogged(boolean logStatements) {
|
||||
logged = logStatements;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Long sync() throws TimeoutException {
|
||||
if (operations.size() == 0) return 0L;
|
||||
final Timer.Context context = requestLatency.time();
|
||||
try {
|
||||
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
|
||||
if (!resultSet.wasApplied()) {
|
||||
throw new HelenusException("Failed to apply batch.");
|
||||
}
|
||||
} finally {
|
||||
context.stop();
|
||||
}
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public Long sync(UnitOfWork<?> uow) throws TimeoutException {
|
||||
if (operations.size() == 0) return 0L;
|
||||
if (uow == null)
|
||||
return sync();
|
||||
|
||||
final Timer.Context context = requestLatency.time();
|
||||
try {
|
||||
ResultSet resultSet = this.execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
|
||||
if (!resultSet.wasApplied()) {
|
||||
throw new HelenusException("Failed to apply batch.");
|
||||
}
|
||||
} finally {
|
||||
context.stop();
|
||||
}
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public void addAll(BatchOperation batch) {
|
||||
batch.operations.forEach(o -> this.operations.add(o));
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
StringBuilder s = new StringBuilder();
|
||||
s.append("BEGIN ");
|
||||
if (!logged) { s.append("UN"); }
|
||||
s.append("LOGGED BATCH; ");
|
||||
s.append(operations.stream().map(o -> Operation.queryString(o.buildStatement(false), showValues)).collect(Collectors.joining("; ")));
|
||||
s.append(" APPLY BATCH;");
|
||||
return s.toString();
|
||||
}
|
||||
}
|
|
@ -56,6 +56,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
|
||||
private int[] ttl;
|
||||
private long[] timestamp;
|
||||
private long writeTime = 0L;
|
||||
|
||||
public InsertOperation(AbstractSessionOperations sessionOperations, boolean ifNotExists) {
|
||||
super(sessionOperations);
|
||||
|
@ -166,6 +167,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
insert.using(QueryBuilder.timestamp(this.timestamp[0]));
|
||||
}
|
||||
|
||||
writeTime = timestamp == null ? insert.getDefaultTimestamp() : timestamp[0];
|
||||
return insert;
|
||||
}
|
||||
|
||||
|
@ -191,8 +193,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
if (backingMap.containsKey(key)) {
|
||||
// Some values man need to be converted (e.g. from String to Enum). This is done
|
||||
// within the BeanColumnValueProvider below.
|
||||
Optional<Function<Object, Object>> converter =
|
||||
prop.getReadConverter(sessionOps.getSessionRepository());
|
||||
Optional<Function<Object, Object>> converter = prop.getReadConverter(sessionOps.getSessionRepository());
|
||||
if (converter.isPresent()) {
|
||||
backingMap.put(key, converter.get().apply(backingMap.get(key)));
|
||||
}
|
||||
|
@ -200,8 +201,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
// If we started this operation with an instance of this type, use values from
|
||||
// that.
|
||||
if (pojo != null) {
|
||||
backingMap.put(
|
||||
key, BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, immutable));
|
||||
backingMap.put(key, BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, immutable));
|
||||
} else {
|
||||
// Otherwise we'll use default values for the property type if available.
|
||||
Class<?> propType = prop.getJavaType();
|
||||
|
@ -250,7 +250,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
}
|
||||
|
||||
protected void adjustTtlAndWriteTime(MapExportable pojo) {
|
||||
if (ttl != null || timestamp != null) {
|
||||
if (ttl != null || writeTime != 0L) {
|
||||
List<String> propertyNames = values.stream()
|
||||
.map(t -> t._1.getProperty())
|
||||
.filter(prop -> {
|
||||
|
@ -262,15 +262,15 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
return true;
|
||||
}
|
||||
})
|
||||
.map(prop -> prop.getColumnName().toCql(true))
|
||||
.map(prop -> prop.getColumnName().toCql(false))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (propertyNames.size() > 0) {
|
||||
if (ttl != null) {
|
||||
propertyNames.forEach(name -> pojo.put(CacheUtil.ttlKey(name), ttl));
|
||||
}
|
||||
if (timestamp != null) {
|
||||
propertyNames.forEach(name -> pojo.put(CacheUtil.writeTimeKey(name), timestamp));
|
||||
if (writeTime != 0L) {
|
||||
propertyNames.forEach(name -> pojo.put(CacheUtil.writeTimeKey(name), writeTime));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -280,8 +280,8 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
public T sync() throws TimeoutException {
|
||||
T result = super.sync();
|
||||
if (entity.isCacheable() && result != null) {
|
||||
sessionOps.updateCache(result, bindFacetValues());
|
||||
adjustTtlAndWriteTime((MapExportable)result);
|
||||
sessionOps.updateCache(result, bindFacetValues());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -303,8 +303,8 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
}
|
||||
Class<?> iface = entity.getMappingInterface();
|
||||
if (resultType == iface) {
|
||||
adjustTtlAndWriteTime((MapExportable)result);
|
||||
cacheUpdate(uow, result, bindFacetValues());
|
||||
adjustTtlAndWriteTime((MapExportable)pojo);
|
||||
} else {
|
||||
if (entity.isCacheable()) {
|
||||
sessionOps.cacheEvict(bindFacetValues());
|
||||
|
@ -321,8 +321,8 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
if (this.entity != null && pojo != null) {
|
||||
Class<?> iface = this.entity.getMappingInterface();
|
||||
if (resultType == iface) {
|
||||
cacheUpdate(uow, pojo, bindFacetValues());
|
||||
adjustTtlAndWriteTime((MapExportable)pojo);
|
||||
cacheUpdate(uow, pojo, bindFacetValues());
|
||||
uow.batch(this);
|
||||
return (T) pojo;
|
||||
}
|
||||
|
|
|
@ -42,6 +42,10 @@ public abstract class Operation<E> {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(Operation.class);
|
||||
|
||||
protected final AbstractSessionOperations sessionOps;
|
||||
protected boolean showValues = true;
|
||||
protected TraceContext traceContext;
|
||||
protected long queryExecutionTimeout = 10;
|
||||
protected TimeUnit queryTimeoutUnits = TimeUnit.SECONDS;
|
||||
protected final Meter uowCacheHits;
|
||||
protected final Meter uowCacheMiss;
|
||||
protected final Meter sessionCacheHits;
|
||||
|
@ -177,7 +181,7 @@ public abstract class Operation<E> {
|
|||
timerString = String.format(" %s ", timer.toString());
|
||||
}
|
||||
LOG.info(
|
||||
String.format("%s%s%s", uowString, timerString, Operation.queryString(statement, false)));
|
||||
String.format("%s%s%s", uowString, timerString, Operation.queryString(statement, showValues)));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -94,13 +94,10 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
|
|||
.forEach(p -> this.props.add(p));
|
||||
|
||||
this.isCacheable = entity.isCacheable();
|
||||
this.implementsEntityType = MappingUtil.extendsInterface(entity.getMappingInterface(), Entity.class);
|
||||
this.implementsEntityType = Entity.class.isAssignableFrom(entity.getMappingInterface());
|
||||
}
|
||||
|
||||
public SelectOperation(
|
||||
AbstractSessionOperations sessionOperations,
|
||||
HelenusEntity entity,
|
||||
Function<Row, E> rowMapper) {
|
||||
public SelectOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity, Function<Row, E> rowMapper) {
|
||||
|
||||
super(sessionOperations);
|
||||
this.rowMapper = rowMapper;
|
||||
|
@ -112,7 +109,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
|
|||
.forEach(p -> this.props.add(p));
|
||||
|
||||
this.isCacheable = entity.isCacheable();
|
||||
this.implementsEntityType = MappingUtil.extendsInterface(entity.getMappingInterface(), Entity.class);
|
||||
this.implementsEntityType = Entity.class.isAssignableFrom(entity.getMappingInterface());
|
||||
}
|
||||
|
||||
public SelectOperation(AbstractSessionOperations sessionOperations, Function<Row, E> rowMapper,
|
||||
|
@ -125,7 +122,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
|
|||
|
||||
HelenusEntity entity = props[0].getEntity();
|
||||
this.isCacheable = entity.isCacheable();
|
||||
this.implementsEntityType = MappingUtil.extendsInterface(entity.getMappingInterface(), Entity.class);
|
||||
this.implementsEntityType = Entity.class.isAssignableFrom(entity.getMappingInterface());
|
||||
}
|
||||
|
||||
public CountOperation count() {
|
||||
|
|
|
@ -51,6 +51,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
|
|||
private Object pojo;
|
||||
private int[] ttl;
|
||||
private long[] timestamp;
|
||||
private long writeTime = 0L;
|
||||
|
||||
public UpdateOperation(AbstractSessionOperations sessionOperations) {
|
||||
super(sessionOperations);
|
||||
|
@ -719,6 +720,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
|
|||
update.using(QueryBuilder.timestamp(this.timestamp[0]));
|
||||
}
|
||||
|
||||
writeTime = timestamp == null ? update.getDefaultTimestamp() : timestamp[0];
|
||||
return update;
|
||||
}
|
||||
|
||||
|
@ -760,11 +762,11 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
|
|||
}
|
||||
|
||||
private void adjustTtlAndWriteTime(MapExportable pojo) {
|
||||
if (ttl != null || timestamp != null) {
|
||||
if (ttl != null || writeTime != 0L) {
|
||||
List<String> names = new ArrayList<String>(assignments.size());
|
||||
for (BoundFacet facet : assignments.values()) {
|
||||
for (HelenusProperty prop : facet.getProperties()) {
|
||||
names.add(prop.getColumnName().toCql(true));
|
||||
names.add(prop.getColumnName().toCql(false));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -772,8 +774,8 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
|
|||
if (ttl != null) {
|
||||
names.forEach(name -> pojo.put(CacheUtil.ttlKey(name), ttl));
|
||||
}
|
||||
if (timestamp != null) {
|
||||
names.forEach(name -> pojo.put(CacheUtil.writeTimeKey(name), timestamp));
|
||||
if (writeTime != 0L) {
|
||||
names.forEach(name -> pojo.put(CacheUtil.writeTimeKey(name), writeTime));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -803,8 +805,11 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
|
|||
}
|
||||
E result = super.sync(uow);
|
||||
if (draft != null) {
|
||||
cacheUpdate(uow, result, bindFacetValues());
|
||||
adjustTtlAndWriteTime(draft);
|
||||
if (entity != null && MapExportable.class.isAssignableFrom(entity.getMappingInterface())) {
|
||||
adjustTtlAndWriteTime((MapExportable) result);
|
||||
}
|
||||
cacheUpdate(uow, result, bindFacetValues());
|
||||
} else if (pojo != null) {
|
||||
cacheUpdate(uow, (E) pojo, bindFacetValues());
|
||||
adjustTtlAndWriteTime((MapExportable)pojo);
|
||||
|
|
|
@ -122,13 +122,13 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
|
|||
key = CacheUtil.writeTimeKey((String)args[0]);
|
||||
} else if (args[0] instanceof Getter) {
|
||||
Getter getter = (Getter)args[0];
|
||||
key = CacheUtil.writeTimeKey(MappingUtil.resolveMappingProperty(getter).getProperty().getPropertyName());
|
||||
key = CacheUtil.writeTimeKey(MappingUtil.resolveMappingProperty(getter).getProperty().getColumnName().toCql(false));
|
||||
} else {
|
||||
return 0L;
|
||||
}
|
||||
long[] v = (long[])src.get(key);
|
||||
Long v = (Long)src.get(key);
|
||||
if (v != null) {
|
||||
return v[0];
|
||||
return v;
|
||||
}
|
||||
return 0L;
|
||||
}
|
||||
|
@ -139,7 +139,7 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
|
|||
key = CacheUtil.ttlKey((String)args[0]);
|
||||
} else if (args[0] instanceof Getter) {
|
||||
Getter getter = (Getter)args[0];
|
||||
key = CacheUtil.ttlKey(MappingUtil.resolveMappingProperty(getter).getProperty().getColumnName().toCql(true));
|
||||
key = CacheUtil.ttlKey(MappingUtil.resolveMappingProperty(getter).getProperty().getColumnName().toCql(false));
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -314,15 +314,6 @@ public final class MappingUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static boolean extendsInterface(Class<?> clazz, Class<?> iface) {
|
||||
Class<?>[] interfaces = clazz.getInterfaces();
|
||||
for (Class<?> i : interfaces) {
|
||||
if (i == iface)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static void rethrow(Throwable cause) throws CloneNotSupportedException {
|
||||
if (cause instanceof RuntimeException) {
|
||||
throw (RuntimeException) cause;
|
||||
|
|
|
@ -347,19 +347,19 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
|||
|
||||
@Test
|
||||
public void testBatchingUpdatesAndInserts() throws Exception {
|
||||
Widget w1, w2, w3, w4, w5;
|
||||
Widget w1, w2, w3, w4, w5, w6;
|
||||
Long committedAt = 0L;
|
||||
UUID key = UUIDs.timeBased();
|
||||
|
||||
try (UnitOfWork uow = session.begin()) {
|
||||
w1 = session.<Widget>upsert(widget)
|
||||
.value(widget::id, key)
|
||||
.value(widget::name, RandomString.make(20))
|
||||
.value(widget::a, RandomString.make(10))
|
||||
.value(widget::b, RandomString.make(10))
|
||||
.value(widget::c, RandomString.make(10))
|
||||
.value(widget::d, RandomString.make(10))
|
||||
.batch(uow);
|
||||
w1 = session.<Widget>upsert(widget)
|
||||
.value(widget::id, key)
|
||||
.value(widget::name, RandomString.make(20))
|
||||
.value(widget::a, RandomString.make(10))
|
||||
.value(widget::b, RandomString.make(10))
|
||||
.value(widget::c, RandomString.make(10))
|
||||
.value(widget::d, RandomString.make(10))
|
||||
.batch(uow);
|
||||
Assert.assertTrue(0L == w1.writtenAt(widget::name));
|
||||
Assert.assertTrue(0 == w1.ttlOf(widget::name));
|
||||
w2 = session.<Widget>update(w1)
|
||||
|
@ -378,7 +378,17 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
|||
Assert.assertEquals(w2, w3);
|
||||
Assert.assertTrue(0L == w3.writtenAt(widget::name));
|
||||
Assert.assertTrue(30 <= w3.ttlOf(widget::name));
|
||||
uow.commit();
|
||||
|
||||
w6 = session.<Widget>upsert(widget)
|
||||
.value(widget::id, UUIDs.timeBased())
|
||||
.value(widget::name, RandomString.make(20))
|
||||
.value(widget::a, RandomString.make(10))
|
||||
.value(widget::b, RandomString.make(10))
|
||||
.value(widget::c, RandomString.make(10))
|
||||
.value(widget::d, RandomString.make(10))
|
||||
.batch(uow);
|
||||
|
||||
uow.commit();
|
||||
committedAt = uow.committedAt();
|
||||
}
|
||||
// 'c' is distinct, but not on it's own so this should miss cache
|
||||
|
@ -401,6 +411,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
|||
Assert.assertTrue(w5.writtenAt(widget::name) == committedAt);
|
||||
int ttl5 = w5.ttlOf(widget::name);
|
||||
Assert.assertTrue(ttl5 <= 30);
|
||||
Assert.assertTrue(w4.writtenAt(widget::name) == w6.writtenAt(widget::name));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -414,6 +425,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
|||
.value(widget::id, key1)
|
||||
.value(widget::name, RandomString.make(20))
|
||||
.sync(uow);
|
||||
/*
|
||||
w2 = session.<Widget>upsert(w1)
|
||||
.value(widget::a, RandomString.make(10))
|
||||
.value(widget::b, RandomString.make(10))
|
||||
|
@ -421,8 +433,10 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
|||
.value(widget::d, RandomString.make(10))
|
||||
.sync(uow);
|
||||
uow.commit();
|
||||
*/
|
||||
uow.abort();
|
||||
}
|
||||
//TODO(gburd): Assert.assertEquals(w1, w2);
|
||||
//Assert.assertEquals(w1, w2);
|
||||
}
|
||||
|
||||
@Test public void testSelectAfterInsertProperlyCachesEntity() throws
|
||||
|
|
Loading…
Reference in a new issue