From 844ebd9155f42142d026aec2fcf10e96dbd8affa Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Fri, 22 Sep 2017 09:46:59 -0400 Subject: [PATCH] Add support for marking queries as idempotent so that the C* driver will apply the retry policy and in some cases perform speculative execution. (see: https://docs.datastax.com/en/developer/java-driver/3.1/manual/retries/ and https://docs.datastax.com/en/developer/java-driver/3.1/manual/speculative_execution/). --- pom.xml | 2 +- .../helenus/core/AbstractSessionOperations.java | 2 ++ .../java/net/helenus/core/HelenusSession.java | 11 ++++++++--- .../net/helenus/core/SessionInitializer.java | 11 +++++++++++ .../operation/AbstractStatementOperation.java | 16 ++++++++++++++++ 5 files changed, 38 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index 566445e..c69d0ba 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 net.helenus helenus-core - 2.0.41-SNAPSHOT + 2.0.42-SNAPSHOT jar helenus diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index 920fabd..1bba0ab 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -51,6 +51,8 @@ public abstract class AbstractSessionOperations { public abstract ConsistencyLevel getDefaultConsistencyLevel(); + public abstract boolean getDefaultQueryIdempotency(); + public PreparedStatement prepare(RegularStatement statement) { try { log(statement, false); diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index e2a2104..db08f8e 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -56,6 +56,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C private volatile String usingKeyspace; private volatile boolean showCql; private final ConsistencyLevel defaultConsistencyLevel; + private final boolean defaultQueryIdempotency; private final MetricRegistry metricRegistry; private final Tracer zipkinTracer; private final PrintStream printStream; @@ -79,6 +80,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C Executor executor, boolean dropSchemaOnClose, ConsistencyLevel consistencyLevel, + boolean defaultQueryIdempotency, Class unitOfWorkClass, MetricRegistry metricRegistry, Tracer tracer) { @@ -93,6 +95,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C this.executor = executor; this.dropSchemaOnClose = dropSchemaOnClose; this.defaultConsistencyLevel = consistencyLevel; + this.defaultQueryIdempotency = defaultQueryIdempotency; this.unitOfWorkClass = unitOfWorkClass; this.metricRegistry = metricRegistry; this.zipkinTracer = tracer; @@ -168,13 +171,15 @@ public final class HelenusSession extends AbstractSessionOperations implements C return metricRegistry; } + @Override public ConsistencyLevel getDefaultConsistencyLevel() { return defaultConsistencyLevel; } - public Metadata getMetadata() { - return metadata; - } + @Override + public boolean getDefaultQueryIdempotency() { return defaultQueryIdempotency; } + + public Metadata getMetadata() { return metadata; } public synchronized UnitOfWork begin() { return begin(null); diff --git a/src/main/java/net/helenus/core/SessionInitializer.java b/src/main/java/net/helenus/core/SessionInitializer.java index 677eb1b..bdfd23e 100644 --- a/src/main/java/net/helenus/core/SessionInitializer.java +++ b/src/main/java/net/helenus/core/SessionInitializer.java @@ -39,6 +39,7 @@ public final class SessionInitializer extends AbstractSessionOperations { private String usingKeyspace; private boolean showCql = false; private ConsistencyLevel consistencyLevel; + private boolean idempotent = true; private MetricRegistry metricRegistry = new MetricRegistry(); private Tracer zipkinTracer; private PrintStream printStream = System.out; @@ -125,6 +126,15 @@ public final class SessionInitializer extends AbstractSessionOperations { return consistencyLevel; } + public SessionInitializer idempotentQueryExecution(boolean idempotent) { + this.idempotent = idempotent; + return this; + } + + public boolean getDefaultQueryIdempotency() { + return idempotent; + } + @Override public PrintStream getPrintStream() { return printStream; @@ -241,6 +251,7 @@ public final class SessionInitializer extends AbstractSessionOperations { executor, autoDdl == AutoDdl.CREATE_DROP, consistencyLevel, + idempotent, unitOfWorkClass, metricRegistry, zipkinTracer); diff --git a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java index 19a0682..6a287f0 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java @@ -44,6 +44,7 @@ public abstract class AbstractStatementOperation