add options for common statement

This commit is contained in:
Albert Shift 2015-04-04 19:24:14 -07:00
parent c69ef2b19c
commit a32d5609ba
3 changed files with 93 additions and 4 deletions

View file

@ -52,14 +52,14 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
public E sync() { public E sync() {
ResultSet resultSet = sessionOps.executeAsync(buildStatement()).getUninterruptibly(); ResultSet resultSet = sessionOps.executeAsync(options(buildStatement())).getUninterruptibly();
return transform(resultSet); return transform(resultSet);
} }
public ListenableFuture<E> async() { public ListenableFuture<E> async() {
ResultSetFuture resultSetFuture = sessionOps.executeAsync(buildStatement()); ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()));
ListenableFuture<E> future = Futures.transform(resultSetFuture, new Function<ResultSet, E>() { ListenableFuture<E> future = Futures.transform(resultSetFuture, new Function<ResultSet, E>() {

View file

@ -15,9 +15,14 @@
*/ */
package com.noorq.casser.core.operation; package com.noorq.casser.core.operation;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement; import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.Statement; import com.datastax.driver.core.Statement;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.FallthroughRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.querybuilder.BuiltStatement; import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.noorq.casser.core.AbstractSessionOperations; import com.noorq.casser.core.AbstractSessionOperations;
@ -29,10 +34,94 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
public abstract Statement buildStatement(); public abstract Statement buildStatement();
private ConsistencyLevel consistencyLevel;
private RetryPolicy retryPolicy;
private boolean enableTracing = false;
public AbstractStatementOperation(AbstractSessionOperations sessionOperations) { public AbstractStatementOperation(AbstractSessionOperations sessionOperations) {
this.sessionOps = sessionOperations; this.sessionOps = sessionOperations;
} }
public O retryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return (O) this;
}
public O defaultRetryPolicy() {
this.retryPolicy = DefaultRetryPolicy.INSTANCE;
return (O) this;
}
public O downgradingConsistencyRetryPolicy() {
this.retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE;
return (O) this;
}
public O fallthroughRetryPolicy() {
this.retryPolicy = FallthroughRetryPolicy.INSTANCE;
return (O) this;
}
public O consistency(ConsistencyLevel level) {
this.consistencyLevel = level;
return (O) this;
}
public O consistencyAny() {
this.consistencyLevel = ConsistencyLevel.ANY;
return (O) this;
}
public O consistencyOne() {
this.consistencyLevel = ConsistencyLevel.ONE;
return (O) this;
}
public O consistencyQuorum() {
this.consistencyLevel = ConsistencyLevel.QUORUM;
return (O) this;
}
public O consistencyAll() {
this.consistencyLevel = ConsistencyLevel.ALL;
return (O) this;
}
public O disableTracing() {
this.enableTracing = false;
return (O) this;
}
public O enableTracing() {
this.enableTracing = true;
return (O) this;
}
public O tracing(boolean enable) {
this.enableTracing = enable;
return (O) this;
}
protected Statement options(Statement statement) {
if (consistencyLevel != null) {
statement.setConsistencyLevel(consistencyLevel);
}
if (retryPolicy != null) {
statement.setRetryPolicy(retryPolicy);
}
if (enableTracing) {
statement.enableTracing();
}
else {
statement.disableTracing();
}
return statement;
}
public String cql() { public String cql() {
Statement statement = buildStatement(); Statement statement = buildStatement();
if (statement instanceof BuiltStatement) { if (statement instanceof BuiltStatement) {

View file

@ -54,14 +54,14 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public Stream<E> sync() { public Stream<E> sync() {
ResultSet resultSet = sessionOps.executeAsync(buildStatement()).getUninterruptibly(); ResultSet resultSet = sessionOps.executeAsync(options(buildStatement())).getUninterruptibly();
return transform(resultSet); return transform(resultSet);
} }
public ListenableFuture<Stream<E>> async() { public ListenableFuture<Stream<E>> async() {
ResultSetFuture resultSetFuture = sessionOps.executeAsync(buildStatement()); ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()));
ListenableFuture<Stream<E>> future = Futures.transform(resultSetFuture, new Function<ResultSet, Stream<E>>() { ListenableFuture<Stream<E>> future = Futures.transform(resultSetFuture, new Function<ResultSet, Stream<E>>() {