From 5570a97dff0f1fcb9cb7a8f8322fa2b7e8a48f50 Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Wed, 8 Nov 2017 13:50:39 -0500 Subject: [PATCH] Improved support for batched statements. --- .../net/helenus/core/AbstractUnitOfWork.java | 35 +++--- .../java/net/helenus/core/UnitOfWork.java | 4 +- .../net/helenus/core/cache/CacheUtil.java | 11 +- .../operation/AbstractOptionalOperation.java | 17 ++- .../operation/AbstractStatementOperation.java | 5 - .../operation/AbstractStreamOperation.java | 29 ++--- .../core/operation/BatchOperation.java | 108 ++++++++++++++++++ .../core/operation/InsertOperation.java | 22 ++-- .../net/helenus/core/operation/Operation.java | 6 +- .../core/operation/SelectOperation.java | 11 +- .../core/operation/UpdateOperation.java | 15 ++- .../core/reflect/MapperInvocationHandler.java | 8 +- .../java/net/helenus/mapping/MappingUtil.java | 9 -- .../core/unitofwork/UnitOfWorkTest.java | 36 ++++-- 14 files changed, 228 insertions(+), 88 deletions(-) create mode 100644 src/main/java/net/helenus/core/operation/BatchOperation.java diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java index d728f88..af4cf65 100644 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -18,6 +18,7 @@ package net.helenus.core; import static net.helenus.core.HelenusSession.deleted; import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ResultSet; import com.diffplug.common.base.Errors; import com.google.common.base.Stopwatch; import com.google.common.collect.HashBasedTable; @@ -25,10 +26,12 @@ import com.google.common.collect.Table; import com.google.common.collect.TreeTraverser; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; import net.helenus.core.operation.AbstractOperation; +import net.helenus.core.operation.BatchOperation; import net.helenus.support.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +59,7 @@ public abstract class AbstractUnitOfWork private boolean aborted = false; private boolean committed = false; private long committedAt = 0L; - private List> operations = new ArrayList>(); + private BatchOperation batch; protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork parent) { Objects.requireNonNull(session, "containing session cannot be null"); @@ -269,7 +272,10 @@ public abstract class AbstractUnitOfWork } public void batch(AbstractOperation s) { - operations.add(s); + if (batch == null) { + batch = new BatchOperation(session); + } + batch.add(s); } private Iterator> getChildNodes() { @@ -282,17 +288,11 @@ public abstract class AbstractUnitOfWork * @return a function from which to chain work that only happens when commit is successful * @throws E when the work overlaps with other concurrent writers. */ - public PostCommitFunction commit() throws E { + public PostCommitFunction commit() throws E, TimeoutException { - if (operations != null && operations.size() > 0) { - if (parent == null) { - BatchStatement batch = new BatchStatement(); - batch.addAll(operations.stream().map(o -> o.buildStatement(false)).collect(Collectors.toList())); - batch.setConsistencyLevel(session.getDefaultConsistencyLevel()); - session.getSession().execute(batch); - } else { - parent.operations.addAll(operations); - } + if (batch != null) { + committedAt = batch.sync(this); + //TODO(gburd) update cache with writeTime... } // All nested UnitOfWork should be committed (not aborted) before calls to @@ -337,6 +337,7 @@ public abstract class AbstractUnitOfWork // Merge cache and statistics into parent if there is one. parent.mergeCache(cache); + parent.addBatched(batch); if (purpose != null) { parent.nestedPurposes.add(purpose); } @@ -362,7 +363,15 @@ public abstract class AbstractUnitOfWork return new PostCommitFunction(this, postCommit); } - /* Explicitly discard the work and mark it as as such in the log. */ + private void addBatched(BatchOperation batch) { + if (this.batch == null) { + this.batch = batch; + } else { + this.batch.addAll(batch); + } + } + + /* Explicitly discard the work and mark it as as such in the log. */ public synchronized void abort() { TreeTraverser> traverser = TreeTraverser.using(node -> node::getChildNodes); diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 007613a..1c66a60 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -19,6 +19,8 @@ import com.datastax.driver.core.Statement; import com.google.common.base.Stopwatch; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeoutException; + import net.helenus.core.cache.Facet; import net.helenus.core.operation.AbstractOperation; @@ -40,7 +42,7 @@ public interface UnitOfWork extends AutoCloseable { * @return a function from which to chain work that only happens when commit is successful * @throws X when the work overlaps with other concurrent writers. */ - PostCommitFunction commit() throws X; + PostCommitFunction commit() throws X, TimeoutException; /** * Explicitly abort the work within this unit of work. Any nested aborted unit of work will diff --git a/src/main/java/net/helenus/core/cache/CacheUtil.java b/src/main/java/net/helenus/core/cache/CacheUtil.java index 78ae064..a51b34d 100644 --- a/src/main/java/net/helenus/core/cache/CacheUtil.java +++ b/src/main/java/net/helenus/core/cache/CacheUtil.java @@ -81,6 +81,13 @@ public class CacheUtil { return combinations; } + /** + * Merge changed values in the map behind `from` into `to`. + * + * @param to + * @param from + * @return + */ public static Object merge(Object to, Object from) { if (to == from) { return to; @@ -112,7 +119,5 @@ public class CacheUtil { return "_" + propertyName + "_writeTime"; } - public static String ttlKey(String propertyName) { - return "_" + propertyName + "_ttl"; - } + public static String ttlKey(String propertyName) { return "_" + propertyName + "_ttl"; } } diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index def7fff..dcd168e 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -33,6 +33,7 @@ import net.helenus.core.AbstractSessionOperations; import net.helenus.core.UnitOfWork; import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; +import net.helenus.support.Fun; public abstract class AbstractOptionalOperation> extends AbstractStatementOperation { @@ -98,9 +99,12 @@ public abstract class AbstractOptionalOperation facets = getFacets(); - if (facets != null && facets.size() > 1) { - sessionOps.updateCache(result.get(), facets); + E r = result.get(); + if (!(r instanceof Fun)) { + List facets = getFacets(); + if (facets != null && facets.size() > 1) { + sessionOps.updateCache(r, facets); + } } } return result; @@ -186,8 +190,11 @@ public abstract class AbstractOptionalOperation> extends Operation { - - protected boolean showValues = true; - protected TraceContext traceContext; - long queryExecutionTimeout = 10; - TimeUnit queryTimeoutUnits = TimeUnit.SECONDS; private boolean ignoreCache = false; private ConsistencyLevel consistencyLevel; private ConsistencyLevel serialConsistencyLevel; diff --git a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java index b246937..fb59dd4 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java @@ -34,6 +34,7 @@ import net.helenus.core.AbstractSessionOperations; import net.helenus.core.UnitOfWork; import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; +import net.helenus.support.Fun; public abstract class AbstractStreamOperation> extends AbstractStatementOperation { @@ -104,7 +105,9 @@ public abstract class AbstractStreamOperation again = new ArrayList<>(); resultStream.forEach( result -> { - sessionOps.updateCache(result, facets); + if (!(result instanceof Fun)) { + sessionOps.updateCache(result, facets); + } again.add(result); }); resultStream = again.stream(); @@ -184,18 +187,18 @@ public abstract class AbstractStreamOperation again = new ArrayList<>(); - List facets = getFacets(); - resultStream.forEach( - result -> { - if (result != deleted) { - if (updateCache) { - cacheUpdate(uow, result, facets); - } - again.add(result); - } - }); - resultStream = again.stream(); + if (updateCache) { + List again = new ArrayList<>(); + List facets = getFacets(); + resultStream.forEach( + result -> { + if (result != deleted && !(result instanceof Fun)) { + cacheUpdate(uow, result, facets); + } + again.add(result); + }); + resultStream = again.stream(); + } } return resultStream; diff --git a/src/main/java/net/helenus/core/operation/BatchOperation.java b/src/main/java/net/helenus/core/operation/BatchOperation.java new file mode 100644 index 0000000..e58eeb3 --- /dev/null +++ b/src/main/java/net/helenus/core/operation/BatchOperation.java @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.core.operation; + +import com.codahale.metrics.Timer; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.querybuilder.BuiltStatement; +import net.helenus.core.AbstractSessionOperations; +import net.helenus.core.UnitOfWork; +import net.helenus.support.HelenusException; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public class BatchOperation extends Operation { + private BatchStatement batch = null; + private List> operations = new ArrayList>(); + private boolean logged = false; + private long timestamp = 0L; + + public BatchOperation(AbstractSessionOperations sessionOperations) { + super(sessionOperations); + } + + public void add(AbstractOperation operation) { + operations.add(operation); + } + + @Override + public BatchStatement buildStatement(boolean cached) { + batch = new BatchStatement(); + batch.addAll(operations.stream().map(o -> o.buildStatement(cached)).collect(Collectors.toList())); + batch.setConsistencyLevel(sessionOps.getDefaultConsistencyLevel()); + timestamp = batch.getDefaultTimestamp(); + return batch; + } + + public BatchOperation logged() { + logged = true; + return this; + } + + public BatchOperation setLogged(boolean logStatements) { + logged = logStatements; + return this; + } + + public Long sync() throws TimeoutException { + if (operations.size() == 0) return 0L; + final Timer.Context context = requestLatency.time(); + try { + ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false); + if (!resultSet.wasApplied()) { + throw new HelenusException("Failed to apply batch."); + } + } finally { + context.stop(); + } + return timestamp; + } + + public Long sync(UnitOfWork uow) throws TimeoutException { + if (operations.size() == 0) return 0L; + if (uow == null) + return sync(); + + final Timer.Context context = requestLatency.time(); + try { + ResultSet resultSet = this.execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false); + if (!resultSet.wasApplied()) { + throw new HelenusException("Failed to apply batch."); + } + } finally { + context.stop(); + } + return timestamp; + } + + public void addAll(BatchOperation batch) { + batch.operations.forEach(o -> this.operations.add(o)); + } + + public String toString() { + StringBuilder s = new StringBuilder(); + s.append("BEGIN "); + if (!logged) { s.append("UN"); } + s.append("LOGGED BATCH; "); + s.append(operations.stream().map(o -> Operation.queryString(o.buildStatement(false), showValues)).collect(Collectors.joining("; "))); + s.append(" APPLY BATCH;"); + return s.toString(); + } +} diff --git a/src/main/java/net/helenus/core/operation/InsertOperation.java b/src/main/java/net/helenus/core/operation/InsertOperation.java index f534944..0628c97 100644 --- a/src/main/java/net/helenus/core/operation/InsertOperation.java +++ b/src/main/java/net/helenus/core/operation/InsertOperation.java @@ -56,6 +56,7 @@ public final class InsertOperation extends AbstractOperation extends AbstractOperation extends AbstractOperation> converter = - prop.getReadConverter(sessionOps.getSessionRepository()); + Optional> converter = prop.getReadConverter(sessionOps.getSessionRepository()); if (converter.isPresent()) { backingMap.put(key, converter.get().apply(backingMap.get(key))); } @@ -200,8 +201,7 @@ public final class InsertOperation extends AbstractOperation propType = prop.getJavaType(); @@ -250,7 +250,7 @@ public final class InsertOperation extends AbstractOperation propertyNames = values.stream() .map(t -> t._1.getProperty()) .filter(prop -> { @@ -262,15 +262,15 @@ public final class InsertOperation extends AbstractOperation prop.getColumnName().toCql(true)) + .map(prop -> prop.getColumnName().toCql(false)) .collect(Collectors.toList()); if (propertyNames.size() > 0) { if (ttl != null) { propertyNames.forEach(name -> pojo.put(CacheUtil.ttlKey(name), ttl)); } - if (timestamp != null) { - propertyNames.forEach(name -> pojo.put(CacheUtil.writeTimeKey(name), timestamp)); + if (writeTime != 0L) { + propertyNames.forEach(name -> pojo.put(CacheUtil.writeTimeKey(name), writeTime)); } } } @@ -280,8 +280,8 @@ public final class InsertOperation extends AbstractOperation extends AbstractOperation iface = entity.getMappingInterface(); if (resultType == iface) { + adjustTtlAndWriteTime((MapExportable)result); cacheUpdate(uow, result, bindFacetValues()); - adjustTtlAndWriteTime((MapExportable)pojo); } else { if (entity.isCacheable()) { sessionOps.cacheEvict(bindFacetValues()); @@ -321,8 +321,8 @@ public final class InsertOperation extends AbstractOperation iface = this.entity.getMappingInterface(); if (resultType == iface) { - cacheUpdate(uow, pojo, bindFacetValues()); adjustTtlAndWriteTime((MapExportable)pojo); + cacheUpdate(uow, pojo, bindFacetValues()); uow.batch(this); return (T) pojo; } diff --git a/src/main/java/net/helenus/core/operation/Operation.java b/src/main/java/net/helenus/core/operation/Operation.java index 993ab8f..e25489b 100644 --- a/src/main/java/net/helenus/core/operation/Operation.java +++ b/src/main/java/net/helenus/core/operation/Operation.java @@ -42,6 +42,10 @@ public abstract class Operation { private static final Logger LOG = LoggerFactory.getLogger(Operation.class); protected final AbstractSessionOperations sessionOps; + protected boolean showValues = true; + protected TraceContext traceContext; + protected long queryExecutionTimeout = 10; + protected TimeUnit queryTimeoutUnits = TimeUnit.SECONDS; protected final Meter uowCacheHits; protected final Meter uowCacheMiss; protected final Meter sessionCacheHits; @@ -177,7 +181,7 @@ public abstract class Operation { timerString = String.format(" %s ", timer.toString()); } LOG.info( - String.format("%s%s%s", uowString, timerString, Operation.queryString(statement, false))); + String.format("%s%s%s", uowString, timerString, Operation.queryString(statement, showValues))); } } diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index 3ab84e8..15ea583 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -94,13 +94,10 @@ public final class SelectOperation extends AbstractFilterStreamOperation this.props.add(p)); this.isCacheable = entity.isCacheable(); - this.implementsEntityType = MappingUtil.extendsInterface(entity.getMappingInterface(), Entity.class); + this.implementsEntityType = Entity.class.isAssignableFrom(entity.getMappingInterface()); } - public SelectOperation( - AbstractSessionOperations sessionOperations, - HelenusEntity entity, - Function rowMapper) { + public SelectOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity, Function rowMapper) { super(sessionOperations); this.rowMapper = rowMapper; @@ -112,7 +109,7 @@ public final class SelectOperation extends AbstractFilterStreamOperation this.props.add(p)); this.isCacheable = entity.isCacheable(); - this.implementsEntityType = MappingUtil.extendsInterface(entity.getMappingInterface(), Entity.class); + this.implementsEntityType = Entity.class.isAssignableFrom(entity.getMappingInterface()); } public SelectOperation(AbstractSessionOperations sessionOperations, Function rowMapper, @@ -125,7 +122,7 @@ public final class SelectOperation extends AbstractFilterStreamOperation extends AbstractFilterOperation extends AbstractFilterOperation extends AbstractFilterOperation names = new ArrayList(assignments.size()); for (BoundFacet facet : assignments.values()) { for (HelenusProperty prop : facet.getProperties()) { - names.add(prop.getColumnName().toCql(true)); + names.add(prop.getColumnName().toCql(false)); } } @@ -772,8 +774,8 @@ public final class UpdateOperation extends AbstractFilterOperation pojo.put(CacheUtil.ttlKey(name), ttl)); } - if (timestamp != null) { - names.forEach(name -> pojo.put(CacheUtil.writeTimeKey(name), timestamp)); + if (writeTime != 0L) { + names.forEach(name -> pojo.put(CacheUtil.writeTimeKey(name), writeTime)); } } } @@ -803,8 +805,11 @@ public final class UpdateOperation extends AbstractFilterOperation implements InvocationHandler, Serializab key = CacheUtil.writeTimeKey((String)args[0]); } else if (args[0] instanceof Getter) { Getter getter = (Getter)args[0]; - key = CacheUtil.writeTimeKey(MappingUtil.resolveMappingProperty(getter).getProperty().getPropertyName()); + key = CacheUtil.writeTimeKey(MappingUtil.resolveMappingProperty(getter).getProperty().getColumnName().toCql(false)); } else { return 0L; } - long[] v = (long[])src.get(key); + Long v = (Long)src.get(key); if (v != null) { - return v[0]; + return v; } return 0L; } @@ -139,7 +139,7 @@ public class MapperInvocationHandler implements InvocationHandler, Serializab key = CacheUtil.ttlKey((String)args[0]); } else if (args[0] instanceof Getter) { Getter getter = (Getter)args[0]; - key = CacheUtil.ttlKey(MappingUtil.resolveMappingProperty(getter).getProperty().getColumnName().toCql(true)); + key = CacheUtil.ttlKey(MappingUtil.resolveMappingProperty(getter).getProperty().getColumnName().toCql(false)); } else { return 0; } diff --git a/src/main/java/net/helenus/mapping/MappingUtil.java b/src/main/java/net/helenus/mapping/MappingUtil.java index 03cc8d5..2461492 100644 --- a/src/main/java/net/helenus/mapping/MappingUtil.java +++ b/src/main/java/net/helenus/mapping/MappingUtil.java @@ -314,15 +314,6 @@ public final class MappingUtil { } } - public static boolean extendsInterface(Class clazz, Class iface) { - Class[] interfaces = clazz.getInterfaces(); - for (Class i : interfaces) { - if (i == iface) - return true; - } - return false; - } - private static void rethrow(Throwable cause) throws CloneNotSupportedException { if (cause instanceof RuntimeException) { throw (RuntimeException) cause; diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java index 8f24484..c33ec5a 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java @@ -347,19 +347,19 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { @Test public void testBatchingUpdatesAndInserts() throws Exception { - Widget w1, w2, w3, w4, w5; + Widget w1, w2, w3, w4, w5, w6; Long committedAt = 0L; UUID key = UUIDs.timeBased(); try (UnitOfWork uow = session.begin()) { - w1 = session.upsert(widget) - .value(widget::id, key) - .value(widget::name, RandomString.make(20)) - .value(widget::a, RandomString.make(10)) - .value(widget::b, RandomString.make(10)) - .value(widget::c, RandomString.make(10)) - .value(widget::d, RandomString.make(10)) - .batch(uow); + w1 = session.upsert(widget) + .value(widget::id, key) + .value(widget::name, RandomString.make(20)) + .value(widget::a, RandomString.make(10)) + .value(widget::b, RandomString.make(10)) + .value(widget::c, RandomString.make(10)) + .value(widget::d, RandomString.make(10)) + .batch(uow); Assert.assertTrue(0L == w1.writtenAt(widget::name)); Assert.assertTrue(0 == w1.ttlOf(widget::name)); w2 = session.update(w1) @@ -378,7 +378,17 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { Assert.assertEquals(w2, w3); Assert.assertTrue(0L == w3.writtenAt(widget::name)); Assert.assertTrue(30 <= w3.ttlOf(widget::name)); - uow.commit(); + + w6 = session.upsert(widget) + .value(widget::id, UUIDs.timeBased()) + .value(widget::name, RandomString.make(20)) + .value(widget::a, RandomString.make(10)) + .value(widget::b, RandomString.make(10)) + .value(widget::c, RandomString.make(10)) + .value(widget::d, RandomString.make(10)) + .batch(uow); + + uow.commit(); committedAt = uow.committedAt(); } // 'c' is distinct, but not on it's own so this should miss cache @@ -401,6 +411,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { Assert.assertTrue(w5.writtenAt(widget::name) == committedAt); int ttl5 = w5.ttlOf(widget::name); Assert.assertTrue(ttl5 <= 30); + Assert.assertTrue(w4.writtenAt(widget::name) == w6.writtenAt(widget::name)); } @Test @@ -414,6 +425,7 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .value(widget::id, key1) .value(widget::name, RandomString.make(20)) .sync(uow); + /* w2 = session.upsert(w1) .value(widget::a, RandomString.make(10)) .value(widget::b, RandomString.make(10)) @@ -421,8 +433,10 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { .value(widget::d, RandomString.make(10)) .sync(uow); uow.commit(); + */ + uow.abort(); } - //TODO(gburd): Assert.assertEquals(w1, w2); + //Assert.assertEquals(w1, w2); } @Test public void testSelectAfterInsertProperlyCachesEntity() throws