diff --git a/NOTES b/NOTES index 1b7148e..65017da 100644 --- a/NOTES +++ b/NOTES @@ -169,3 +169,17 @@ begin: cache.put } */ +------------------ + +InsertOperation + + + Class iface = entity.getMappingInterface(); + boolean includesNonIdentityValues = values.stream().map(t -> { + ColumnType type = t._1.getProperty().getColumnType(); + return !((type == ColumnType.PARTITION_KEY) || (type == ColumnType.CLUSTERING_COLUMN)); + }) + .reduce(false, (acc, t) -> acc || t); + if (resultType == iface) { + if (values.size() > 0 && includesNonIdentityValues) { + boolean immutable = iface.isAssignableFrom(Drafted.class); diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index 3f8eba2..df10323 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -118,7 +118,7 @@ public abstract class AbstractSessionOperations { private void logStatement(Statement statement, boolean showValues) { if (isShowCql()) { printCql(Operation.queryString(statement, showValues)); - } else if (LOG.isInfoEnabled()) { + } else if (LOG.isDebugEnabled()) { LOG.info("CQL> " + Operation.queryString(statement, showValues)); } } diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java index 5793f8c..b381e6f 100644 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -45,6 +45,7 @@ public abstract class AbstractUnitOfWork implements UnitOfW private final Table>> cache = HashBasedTable.create(); protected String purpose; protected List nestedPurposes = new ArrayList(); + protected String info; protected int cacheHits = 0; protected int cacheMisses = 0; protected int databaseLookups = 0; @@ -104,6 +105,11 @@ public abstract class AbstractUnitOfWork implements UnitOfW return this; } + @Override + public void setInfo(String info) { + this.info = info; + } + @Override public void recordCacheAndDatabaseOperationCount(int cache, int ops) { if (cache > 0) { @@ -147,9 +153,10 @@ public abstract class AbstractUnitOfWork implements UnitOfW } String x = nestedPurposes.stream().distinct().collect(Collectors.joining(", ")); String n = nested.stream().map(uow -> String.valueOf(uow.hashCode())).collect(Collectors.joining(", ")); - String s = String.format(Locale.US, "UOW(%s%s) %s in %,.3fms%s%s%s%s%s", hashCode(), + String s = String.format(Locale.US, "UOW(%s%s) %s in %,.3fms%s%s%s%s%s%s", hashCode(), (nested.size() > 0 ? ", [" + n + "]" : ""), what, e, cache, database, da, - (purpose == null ? "" : " " + purpose), (nestedPurposes.isEmpty()) ? "" : ", " + x); + (purpose == null ? "" : " " + purpose), (nestedPurposes.isEmpty()) ? "" : ", " + x, + (info == null) ? "" : " " + info); return s; } diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 5d55853..9c1bc25 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -63,6 +63,7 @@ public interface UnitOfWork extends AutoCloseable { String getPurpose(); UnitOfWork setPurpose(String purpose); + void setInfo(String info); void addDatabaseTime(String name, Stopwatch amount); void addCacheLookupTime(Stopwatch amount); diff --git a/src/main/java/net/helenus/core/operation/InsertOperation.java b/src/main/java/net/helenus/core/operation/InsertOperation.java index 91959ba..405e0c2 100644 --- a/src/main/java/net/helenus/core/operation/InsertOperation.java +++ b/src/main/java/net/helenus/core/operation/InsertOperation.java @@ -24,14 +24,14 @@ import com.datastax.driver.core.querybuilder.BuiltStatement; import com.datastax.driver.core.querybuilder.Insert; import com.datastax.driver.core.querybuilder.QueryBuilder; -import net.helenus.core.AbstractSessionOperations; -import net.helenus.core.Getter; -import net.helenus.core.Helenus; -import net.helenus.core.UnitOfWork; +import net.helenus.core.*; +import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; +import net.helenus.core.cache.UnboundFacet; import net.helenus.core.reflect.DefaultPrimitiveTypes; import net.helenus.core.reflect.Drafted; import net.helenus.core.reflect.HelenusPropertyNode; +import net.helenus.mapping.ColumnType; import net.helenus.mapping.HelenusEntity; import net.helenus.mapping.HelenusProperty; import net.helenus.mapping.MappingUtil; @@ -163,6 +163,10 @@ public final class InsertOperation extends AbstractOperation iface = entity.getMappingInterface(); if (resultType == iface) { if (values.size() > 0) { @@ -207,9 +211,7 @@ public final class InsertOperation extends AbstractOperation extends AbstractOperation iface = entity.getMappingInterface(); if (resultType == iface) { cacheUpdate(uow, result, entity.getFacets()); @@ -262,6 +273,36 @@ public final class InsertOperation extends AbstractOperation bindFacetValues() { + List facets = getFacets(); + if (facets == null || facets.size() == 0) { + return new ArrayList(); + } + List boundFacets = new ArrayList<>(); + Map valuesMap = new HashMap<>(values.size()); + values.forEach(t -> valuesMap.put(t._1.getProperty(), t._2)); + + for (Facet facet : facets) { + if (facet instanceof UnboundFacet) { + UnboundFacet unboundFacet = (UnboundFacet) facet; + UnboundFacet.Binder binder = unboundFacet.binder(); + for (HelenusProperty prop : unboundFacet.getProperties()) { + Object value = valuesMap.get(prop); + if (value != null) { + binder.setValueForProperty(prop, value.toString()); + } + } + if (binder.isBound()) { + boundFacets.add(binder.bind()); + } + } else { + boundFacets.add(facet); + } + } + return boundFacets; + } + @Override public List getFacets() { if (entity != null) { diff --git a/src/main/java/net/helenus/core/operation/Operation.java b/src/main/java/net/helenus/core/operation/Operation.java index 00262bb..e0d18a0 100644 --- a/src/main/java/net/helenus/core/operation/Operation.java +++ b/src/main/java/net/helenus/core/operation/Operation.java @@ -15,21 +15,22 @@ */ package net.helenus.core.operation; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import com.datastax.driver.core.*; +import net.helenus.support.HelenusException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.datastax.driver.core.RegularStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.BuiltStatement; import com.google.common.base.Stopwatch; @@ -113,6 +114,33 @@ public abstract class Operation { if (uow != null) uow.recordCacheAndDatabaseOperationCount(0, 1); ResultSet resultSet = futureResultSet.getUninterruptibly(timeout, units); + ColumnDefinitions columnDefinitions = resultSet.getColumnDefinitions(); + if (LOG.isDebugEnabled()) { + ExecutionInfo ei = resultSet.getExecutionInfo(); + Host qh = ei.getQueriedHost(); + String oh = ei.getTriedHosts().stream().map(Host::getAddress).map(InetAddress::toString).collect(Collectors.joining(", ")); + ConsistencyLevel cl = ei.getAchievedConsistencyLevel(); + int se = ei.getSpeculativeExecutions(); + String warn = ei.getWarnings().stream().collect(Collectors.joining(", ")); + String ri = String.format("%s %s %s %s %sd%s%s spec-retries: %d", + "C* v" + qh.getCassandraVersion(), + qh.getAddress().toString(), + qh.getDatacenter(), + qh.getRack(), + (oh != null && !oh.equals("")) ? "[" + oh + "] " : "", + (cl != null) ? (" consistency: " + cl.name() + + (cl.isDCLocal() ? " DC " : "") + + (cl.isSerial() ? " SC " : "")) : "", + (warn != null && !warn.equals("")) ? ": " + warn : "", + se); + if (uow != null) + uow.setInfo(ri); + else + LOG.debug(ri); + } + if (!resultSet.wasApplied() && !(columnDefinitions.size() > 1 || !columnDefinitions.contains("[applied]"))) { + throw new HelenusException("Operation Failed"); + } return resultSet; } finally { diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index 497868b..dcd624b 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -239,10 +239,6 @@ public final class SelectOperation extends AbstractFilterStreamOperation extends AbstractFilterOperationinsert(widget).value(widget::id, key).value(widget::name, RandomString.make(20)).sync(); + // This should inserted Widget, but not cache it. + w1 = session.insert(widget).value(widget::id, key).value(widget::name, RandomString.make(20)).sync(); - try (UnitOfWork uow = session.begin()) { + try (UnitOfWork uow = session.begin()) { // This should read from the database and return a Widget. w2 = session.select(widget).where(widget::id, eq(key)).single() @@ -225,7 +225,21 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { Assert.assertEquals(w4, null); } +/* + @Test + public void testInsertNoOp() throws Exception { + Widget w1, w2; + UUID key = UUIDs.timeBased(); + + try (UnitOfWork uow = session.begin()) { + // This should inserted Widget, but not cache it. + w1 = session.insert(widget).value(widget::id, key).value(widget::name, RandomString.make(20)).sync(uow); + w2 = session.insert(w1).value(widget::id, key).sync(uow); + } + Assert.assertEquals(w1, w2); + } +*/ /* * @Test public void testSelectAfterInsertProperlyCachesEntity() throws * Exception { Widget w1, w2, w3, w4; UUID key = UUIDs.timeBased(); diff --git a/src/test/java/net/helenus/test/integration/core/views/MaterializedViewTest.java b/src/test/java/net/helenus/test/integration/core/views/MaterializedViewTest.java index 8d4a431..96a504a 100644 --- a/src/test/java/net/helenus/test/integration/core/views/MaterializedViewTest.java +++ b/src/test/java/net/helenus/test/integration/core/views/MaterializedViewTest.java @@ -65,6 +65,6 @@ public class MaterializedViewTest extends AbstractEmbeddedCassandraTest { @Test public void testMv() throws TimeoutException { - session.select(Cyclist.class).from(CyclistsByAge.class).where(cyclist::age, eq(18)).sync(); + session.select(Cyclist.class).from(CyclistsByAge.class).where(cyclist::age, eq(18)).allowFiltering().sync(); } }