No longer add allowFiltering based on entity properties, force user to add as required. Add information about Cassandra execution to the UOW log line when DEBUG. Throw errors when execution doesn't complete properly.
This commit is contained in:
parent
792d2b6598
commit
377191f12a
10 changed files with 132 additions and 26 deletions
14
NOTES
14
NOTES
|
@ -169,3 +169,17 @@ begin:
|
|||
cache.put
|
||||
}
|
||||
*/
|
||||
------------------
|
||||
|
||||
InsertOperation
|
||||
|
||||
|
||||
Class<?> iface = entity.getMappingInterface();
|
||||
boolean includesNonIdentityValues = values.stream().map(t -> {
|
||||
ColumnType type = t._1.getProperty().getColumnType();
|
||||
return !((type == ColumnType.PARTITION_KEY) || (type == ColumnType.CLUSTERING_COLUMN));
|
||||
})
|
||||
.reduce(false, (acc, t) -> acc || t);
|
||||
if (resultType == iface) {
|
||||
if (values.size() > 0 && includesNonIdentityValues) {
|
||||
boolean immutable = iface.isAssignableFrom(Drafted.class);
|
||||
|
|
|
@ -118,7 +118,7 @@ public abstract class AbstractSessionOperations {
|
|||
private void logStatement(Statement statement, boolean showValues) {
|
||||
if (isShowCql()) {
|
||||
printCql(Operation.queryString(statement, showValues));
|
||||
} else if (LOG.isInfoEnabled()) {
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.info("CQL> " + Operation.queryString(statement, showValues));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create();
|
||||
protected String purpose;
|
||||
protected List<String> nestedPurposes = new ArrayList<String>();
|
||||
protected String info;
|
||||
protected int cacheHits = 0;
|
||||
protected int cacheMisses = 0;
|
||||
protected int databaseLookups = 0;
|
||||
|
@ -104,6 +105,11 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setInfo(String info) {
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordCacheAndDatabaseOperationCount(int cache, int ops) {
|
||||
if (cache > 0) {
|
||||
|
@ -147,9 +153,10 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
}
|
||||
String x = nestedPurposes.stream().distinct().collect(Collectors.joining(", "));
|
||||
String n = nested.stream().map(uow -> String.valueOf(uow.hashCode())).collect(Collectors.joining(", "));
|
||||
String s = String.format(Locale.US, "UOW(%s%s) %s in %,.3fms%s%s%s%s%s", hashCode(),
|
||||
String s = String.format(Locale.US, "UOW(%s%s) %s in %,.3fms%s%s%s%s%s%s", hashCode(),
|
||||
(nested.size() > 0 ? ", [" + n + "]" : ""), what, e, cache, database, da,
|
||||
(purpose == null ? "" : " " + purpose), (nestedPurposes.isEmpty()) ? "" : ", " + x);
|
||||
(purpose == null ? "" : " " + purpose), (nestedPurposes.isEmpty()) ? "" : ", " + x,
|
||||
(info == null) ? "" : " " + info);
|
||||
return s;
|
||||
}
|
||||
|
||||
|
|
|
@ -63,6 +63,7 @@ public interface UnitOfWork<X extends Exception> extends AutoCloseable {
|
|||
|
||||
String getPurpose();
|
||||
UnitOfWork setPurpose(String purpose);
|
||||
void setInfo(String info);
|
||||
|
||||
void addDatabaseTime(String name, Stopwatch amount);
|
||||
void addCacheLookupTime(Stopwatch amount);
|
||||
|
|
|
@ -24,14 +24,14 @@ import com.datastax.driver.core.querybuilder.BuiltStatement;
|
|||
import com.datastax.driver.core.querybuilder.Insert;
|
||||
import com.datastax.driver.core.querybuilder.QueryBuilder;
|
||||
|
||||
import net.helenus.core.AbstractSessionOperations;
|
||||
import net.helenus.core.Getter;
|
||||
import net.helenus.core.Helenus;
|
||||
import net.helenus.core.UnitOfWork;
|
||||
import net.helenus.core.*;
|
||||
import net.helenus.core.cache.CacheUtil;
|
||||
import net.helenus.core.cache.Facet;
|
||||
import net.helenus.core.cache.UnboundFacet;
|
||||
import net.helenus.core.reflect.DefaultPrimitiveTypes;
|
||||
import net.helenus.core.reflect.Drafted;
|
||||
import net.helenus.core.reflect.HelenusPropertyNode;
|
||||
import net.helenus.mapping.ColumnType;
|
||||
import net.helenus.mapping.HelenusEntity;
|
||||
import net.helenus.mapping.HelenusProperty;
|
||||
import net.helenus.mapping.MappingUtil;
|
||||
|
@ -163,6 +163,10 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
|
||||
@Override
|
||||
public T transform(ResultSet resultSet) {
|
||||
if ((ifNotExists == true) && (resultSet.wasApplied() == false)) {
|
||||
throw new HelenusException("Statement was not applied due to consistency constraints");
|
||||
}
|
||||
|
||||
Class<?> iface = entity.getMappingInterface();
|
||||
if (resultType == iface) {
|
||||
if (values.size() > 0) {
|
||||
|
@ -207,9 +211,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
// Lastly, create a new proxy object for the entity and return the new instance.
|
||||
return (T) Helenus.map(iface, backingMap);
|
||||
}
|
||||
// Oddly, this insert didn't change any value so simply return the pojo.
|
||||
// TODO(gburd): this pojo is the result of a Draft.build() call which will not
|
||||
// preserve object identity (o1 == o2), ... fix me.
|
||||
// Oddly, this insert didn't change anything so simply return the pojo.
|
||||
return (T) pojo;
|
||||
}
|
||||
return (T) resultSet;
|
||||
|
@ -251,6 +253,15 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
return sync();
|
||||
}
|
||||
T result = super.sync(uow);
|
||||
if (result != null && pojo != null && !(pojo == result) && pojo.equals(result)) {
|
||||
// To preserve object identity we need to find this object in cache
|
||||
// because it was unchanged by the INSERT but pojo in this case was
|
||||
// the result of a draft.build().
|
||||
T cachedValue = (T) uow.cacheLookup(bindFacetValues());
|
||||
if (cachedValue != null) {
|
||||
result = cachedValue;
|
||||
}
|
||||
}
|
||||
Class<?> iface = entity.getMappingInterface();
|
||||
if (resultType == iface) {
|
||||
cacheUpdate(uow, result, entity.getFacets());
|
||||
|
@ -262,6 +273,36 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
|
|||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Facet> bindFacetValues() {
|
||||
List<Facet> facets = getFacets();
|
||||
if (facets == null || facets.size() == 0) {
|
||||
return new ArrayList<Facet>();
|
||||
}
|
||||
List<Facet> boundFacets = new ArrayList<>();
|
||||
Map<HelenusProperty, Object> valuesMap = new HashMap<>(values.size());
|
||||
values.forEach(t -> valuesMap.put(t._1.getProperty(), t._2));
|
||||
|
||||
for (Facet facet : facets) {
|
||||
if (facet instanceof UnboundFacet) {
|
||||
UnboundFacet unboundFacet = (UnboundFacet) facet;
|
||||
UnboundFacet.Binder binder = unboundFacet.binder();
|
||||
for (HelenusProperty prop : unboundFacet.getProperties()) {
|
||||
Object value = valuesMap.get(prop);
|
||||
if (value != null) {
|
||||
binder.setValueForProperty(prop, value.toString());
|
||||
}
|
||||
}
|
||||
if (binder.isBound()) {
|
||||
boundFacets.add(binder.bind());
|
||||
}
|
||||
} else {
|
||||
boundFacets.add(facet);
|
||||
}
|
||||
}
|
||||
return boundFacets;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Facet> getFacets() {
|
||||
if (entity != null) {
|
||||
|
|
|
@ -15,21 +15,22 @@
|
|||
*/
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.datastax.driver.core.*;
|
||||
import net.helenus.support.HelenusException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.datastax.driver.core.RegularStatement;
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.ResultSetFuture;
|
||||
import com.datastax.driver.core.Statement;
|
||||
import com.datastax.driver.core.querybuilder.BuiltStatement;
|
||||
import com.google.common.base.Stopwatch;
|
||||
|
||||
|
@ -113,6 +114,33 @@ public abstract class Operation<E> {
|
|||
if (uow != null)
|
||||
uow.recordCacheAndDatabaseOperationCount(0, 1);
|
||||
ResultSet resultSet = futureResultSet.getUninterruptibly(timeout, units);
|
||||
ColumnDefinitions columnDefinitions = resultSet.getColumnDefinitions();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
ExecutionInfo ei = resultSet.getExecutionInfo();
|
||||
Host qh = ei.getQueriedHost();
|
||||
String oh = ei.getTriedHosts().stream().map(Host::getAddress).map(InetAddress::toString).collect(Collectors.joining(", "));
|
||||
ConsistencyLevel cl = ei.getAchievedConsistencyLevel();
|
||||
int se = ei.getSpeculativeExecutions();
|
||||
String warn = ei.getWarnings().stream().collect(Collectors.joining(", "));
|
||||
String ri = String.format("%s %s %s %s %sd%s%s spec-retries: %d",
|
||||
"C* v" + qh.getCassandraVersion(),
|
||||
qh.getAddress().toString(),
|
||||
qh.getDatacenter(),
|
||||
qh.getRack(),
|
||||
(oh != null && !oh.equals("")) ? "[" + oh + "] " : "",
|
||||
(cl != null) ? (" consistency: " + cl.name() +
|
||||
(cl.isDCLocal() ? " DC " : "") +
|
||||
(cl.isSerial() ? " SC " : "")) : "",
|
||||
(warn != null && !warn.equals("")) ? ": " + warn : "",
|
||||
se);
|
||||
if (uow != null)
|
||||
uow.setInfo(ri);
|
||||
else
|
||||
LOG.debug(ri);
|
||||
}
|
||||
if (!resultSet.wasApplied() && !(columnDefinitions.size() > 1 || !columnDefinitions.contains("[applied]"))) {
|
||||
throw new HelenusException("Operation Failed");
|
||||
}
|
||||
return resultSet;
|
||||
|
||||
} finally {
|
||||
|
|
|
@ -239,10 +239,6 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
|
|||
String columnName = prop.getColumnName();
|
||||
selection = selection.column(columnName);
|
||||
|
||||
if (prop.getProperty().caseSensitiveIndex()) {
|
||||
allowFiltering = true;
|
||||
}
|
||||
|
||||
if (entity == null) {
|
||||
entity = prop.getEntity();
|
||||
} else if (entity != prop.getEntity()) {
|
||||
|
|
|
@ -36,6 +36,7 @@ import net.helenus.mapping.HelenusProperty;
|
|||
import net.helenus.mapping.MappingUtil;
|
||||
import net.helenus.mapping.value.BeanColumnValueProvider;
|
||||
import net.helenus.mapping.value.ValueProviderMap;
|
||||
import net.helenus.support.HelenusException;
|
||||
import net.helenus.support.HelenusMappingException;
|
||||
import net.helenus.support.Immutables;
|
||||
|
||||
|
@ -656,6 +657,10 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
|
|||
|
||||
@Override
|
||||
public E transform(ResultSet resultSet) {
|
||||
if ((ifFilters != null && !ifFilters.isEmpty()) && (resultSet.wasApplied() == false)) {
|
||||
throw new HelenusException("Statement was not applied due to consistency constraints");
|
||||
}
|
||||
|
||||
if (draft != null) {
|
||||
return Helenus.map(draft.getEntityClass(), draft.toMap(draftMap));
|
||||
} else {
|
||||
|
|
|
@ -190,15 +190,15 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
|||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSelectAfterDeleted() throws Exception {
|
||||
Widget w1, w2, w3, w4;
|
||||
UUID key = UUIDs.timeBased();
|
||||
@Test
|
||||
public void testSelectAfterDeleted() throws Exception {
|
||||
Widget w1, w2, w3, w4;
|
||||
UUID key = UUIDs.timeBased();
|
||||
|
||||
// This should inserted Widget, but not cache it.
|
||||
w1 = session.<Widget>insert(widget).value(widget::id, key).value(widget::name, RandomString.make(20)).sync();
|
||||
// This should inserted Widget, but not cache it.
|
||||
w1 = session.<Widget>insert(widget).value(widget::id, key).value(widget::name, RandomString.make(20)).sync();
|
||||
|
||||
try (UnitOfWork uow = session.begin()) {
|
||||
try (UnitOfWork uow = session.begin()) {
|
||||
|
||||
// This should read from the database and return a Widget.
|
||||
w2 = session.<Widget>select(widget).where(widget::id, eq(key)).single()
|
||||
|
@ -225,7 +225,21 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
|||
|
||||
Assert.assertEquals(w4, null);
|
||||
}
|
||||
/*
|
||||
@Test
|
||||
public void testInsertNoOp() throws Exception {
|
||||
Widget w1, w2;
|
||||
UUID key = UUIDs.timeBased();
|
||||
|
||||
|
||||
try (UnitOfWork uow = session.begin()) {
|
||||
// This should inserted Widget, but not cache it.
|
||||
w1 = session.<Widget>insert(widget).value(widget::id, key).value(widget::name, RandomString.make(20)).sync(uow);
|
||||
w2 = session.<Widget>insert(w1).value(widget::id, key).sync(uow);
|
||||
}
|
||||
Assert.assertEquals(w1, w2);
|
||||
}
|
||||
*/
|
||||
/*
|
||||
* @Test public void testSelectAfterInsertProperlyCachesEntity() throws
|
||||
* Exception { Widget w1, w2, w3, w4; UUID key = UUIDs.timeBased();
|
||||
|
|
|
@ -65,6 +65,6 @@ public class MaterializedViewTest extends AbstractEmbeddedCassandraTest {
|
|||
|
||||
@Test
|
||||
public void testMv() throws TimeoutException {
|
||||
session.select(Cyclist.class).from(CyclistsByAge.class).where(cyclist::age, eq(18)).sync();
|
||||
session.select(Cyclist.class).from(CyclistsByAge.class).where(cyclist::age, eq(18)).allowFiltering().sync();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue