Add support for marking queries as idempotent so that the C* driver will apply the retry policy and in some cases perform speculative execution. (see: https://docs.datastax.com/en/developer/java-driver/3.1/manual/retries/ and https://docs.datastax.com/en/developer/java-driver/3.1/manual/speculative_execution/).

This commit is contained in:
Greg Burd 2017-09-22 09:46:59 -04:00
parent a7094abdfa
commit 844ebd9155
5 changed files with 38 additions and 4 deletions

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>net.helenus</groupId>
<artifactId>helenus-core</artifactId>
<version>2.0.41-SNAPSHOT</version>
<version>2.0.42-SNAPSHOT</version>
<packaging>jar</packaging>
<name>helenus</name>

View file

@ -51,6 +51,8 @@ public abstract class AbstractSessionOperations {
public abstract ConsistencyLevel getDefaultConsistencyLevel();
public abstract boolean getDefaultQueryIdempotency();
public PreparedStatement prepare(RegularStatement statement) {
try {
log(statement, false);

View file

@ -56,6 +56,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
private volatile String usingKeyspace;
private volatile boolean showCql;
private final ConsistencyLevel defaultConsistencyLevel;
private final boolean defaultQueryIdempotency;
private final MetricRegistry metricRegistry;
private final Tracer zipkinTracer;
private final PrintStream printStream;
@ -79,6 +80,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
Executor executor,
boolean dropSchemaOnClose,
ConsistencyLevel consistencyLevel,
boolean defaultQueryIdempotency,
Class<? extends UnitOfWork> unitOfWorkClass,
MetricRegistry metricRegistry,
Tracer tracer) {
@ -93,6 +95,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
this.executor = executor;
this.dropSchemaOnClose = dropSchemaOnClose;
this.defaultConsistencyLevel = consistencyLevel;
this.defaultQueryIdempotency = defaultQueryIdempotency;
this.unitOfWorkClass = unitOfWorkClass;
this.metricRegistry = metricRegistry;
this.zipkinTracer = tracer;
@ -168,13 +171,15 @@ public final class HelenusSession extends AbstractSessionOperations implements C
return metricRegistry;
}
@Override
public ConsistencyLevel getDefaultConsistencyLevel() {
return defaultConsistencyLevel;
}
public Metadata getMetadata() {
return metadata;
}
@Override
public boolean getDefaultQueryIdempotency() { return defaultQueryIdempotency; }
public Metadata getMetadata() { return metadata; }
public synchronized UnitOfWork begin() {
return begin(null);

View file

@ -39,6 +39,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
private String usingKeyspace;
private boolean showCql = false;
private ConsistencyLevel consistencyLevel;
private boolean idempotent = true;
private MetricRegistry metricRegistry = new MetricRegistry();
private Tracer zipkinTracer;
private PrintStream printStream = System.out;
@ -125,6 +126,15 @@ public final class SessionInitializer extends AbstractSessionOperations {
return consistencyLevel;
}
public SessionInitializer idempotentQueryExecution(boolean idempotent) {
this.idempotent = idempotent;
return this;
}
public boolean getDefaultQueryIdempotency() {
return idempotent;
}
@Override
public PrintStream getPrintStream() {
return printStream;
@ -241,6 +251,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
executor,
autoDdl == AutoDdl.CREATE_DROP,
consistencyLevel,
idempotent,
unitOfWorkClass,
metricRegistry,
zipkinTracer);

View file

@ -44,6 +44,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
private ConsistencyLevel consistencyLevel;
private ConsistencyLevel serialConsistencyLevel;
private RetryPolicy retryPolicy;
private boolean idempotent = false;
private boolean enableTracing = false;
private long[] defaultTimestamp = null;
private int[] fetchSize = null;
@ -51,6 +52,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
public AbstractStatementOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
this.consistencyLevel = sessionOperations.getDefaultConsistencyLevel();
this.idempotent = sessionOperations.getDefaultQueryIdempotency();
}
@ -85,6 +87,16 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
return (O) this;
}
public O idempotent() {
this.idempotent = true;
return (O) this;
}
public O isIdempotent(boolean idempotent) {
this.idempotent = idempotent;
return (O) this;
}
public O downgradingConsistencyRetryPolicy() {
this.retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
return (O) this;
@ -219,6 +231,10 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
statement.setFetchSize(fetchSize[0]);
}
if (idempotent) {
statement.setIdempotent(true);
}
return statement;
}