diff --git a/src/main/java/net/helenus/core/operation/AbstractOperation.java b/src/main/java/net/helenus/core/operation/AbstractOperation.java index 8eb7961..7990c5a 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOperation.java @@ -17,7 +17,13 @@ package net.helenus.core.operation; import com.codahale.metrics.Timer; import com.datastax.driver.core.ResultSet; + +import java.sql.Time; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; + +import com.diffplug.common.base.Errors; import net.helenus.core.AbstractSessionOperations; import net.helenus.core.UnitOfWork; @@ -38,22 +44,22 @@ public abstract class AbstractOperation> return new PreparedOperation(prepareStatement(), this); } - public E sync() { + public E sync() throws TimeoutException { final Timer.Context context = requestLatency.time(); try { - ResultSet resultSet = this.execute(sessionOps, null, traceContext, showValues, false); + ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false); return transform(resultSet); } finally { context.stop(); } } - public E sync(UnitOfWork uow) { + public E sync(UnitOfWork uow) throws TimeoutException { if (uow == null) return sync(); final Timer.Context context = requestLatency.time(); try { - ResultSet resultSet = execute(sessionOps, uow, traceContext, showValues, true); + ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, true); E result = transform(resultSet); return result; } finally { @@ -62,11 +68,19 @@ public abstract class AbstractOperation> } public CompletableFuture async() { - return CompletableFuture.supplyAsync(() -> sync()); + return CompletableFuture.supplyAsync(() -> { + try { + return sync(); + } catch (TimeoutException ex) { throw new CompletionException(ex); } + }); } public CompletableFuture async(UnitOfWork uow) { if (uow == null) return async(); - return CompletableFuture.supplyAsync(() -> sync(uow)); + return CompletableFuture.supplyAsync(() -> { + try { + return sync(); + } catch (TimeoutException ex) { throw new CompletionException(ex); } + }); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index 0827105..349469a 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -26,6 +26,9 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; + import net.helenus.core.AbstractSessionOperations; import net.helenus.core.Filter; import net.helenus.core.Helenus; @@ -63,17 +66,17 @@ public abstract class AbstractOptionalOperation sync() { + public Optional sync() throws TimeoutException { final Timer.Context context = requestLatency.time(); try { - ResultSet resultSet = this.execute(sessionOps, null, traceContext, showValues, false); + ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false); return transform(resultSet); } finally { context.stop(); } } - public Optional sync(UnitOfWork uow) { + public Optional sync(UnitOfWork uow) throws TimeoutException { if (uow == null) return sync(); final Timer.Context context = requestLatency.time(); @@ -120,8 +123,7 @@ public abstract class AbstractOptionalOperation> async() { - return CompletableFuture.>supplyAsync(() -> sync()); + return CompletableFuture.>supplyAsync(() -> { + try { + return sync(); + } catch (TimeoutException ex) { throw new CompletionException(ex); } + }); } public CompletableFuture> async(UnitOfWork uow) { if (uow == null) return async(); - return CompletableFuture.>supplyAsync(() -> sync(uow)); + return CompletableFuture.>supplyAsync(() -> { + try { + return sync(); + } catch (TimeoutException ex) { throw new CompletionException(ex); } + }); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java index 5177724..bba8f9b 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java @@ -32,6 +32,8 @@ import net.helenus.support.HelenusException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + public abstract class AbstractStatementOperation> extends Operation { @@ -49,6 +51,8 @@ public abstract class AbstractStatementOperation sync() { + public Stream sync() throws TimeoutException { final Timer.Context context = requestLatency.time(); try { - ResultSet resultSet = this.execute(sessionOps, null, traceContext, showValues, false); + ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false); return transform(resultSet); } finally { context.stop(); } } - public Stream sync(UnitOfWork uow) { + public Stream sync(UnitOfWork uow) throws TimeoutException { if (uow == null) return sync(); final Timer.Context context = requestLatency.time(); @@ -82,7 +84,7 @@ public abstract class AbstractStreamOperation> async() { - return CompletableFuture.>supplyAsync(() -> sync()); + return CompletableFuture.>supplyAsync(() -> { + try { + return sync(); + } catch (TimeoutException ex) { throw new CompletionException(ex); } + }); } public CompletableFuture> async(UnitOfWork uow) { if (uow == null) return async(); - return CompletableFuture.>supplyAsync(() -> sync(uow)); + return CompletableFuture.>supplyAsync(() -> { + try { + return sync(); + } catch (TimeoutException ex) { throw new CompletionException(ex); } + }); } } diff --git a/src/main/java/net/helenus/core/operation/InsertOperation.java b/src/main/java/net/helenus/core/operation/InsertOperation.java index 53abaee..61b883c 100644 --- a/src/main/java/net/helenus/core/operation/InsertOperation.java +++ b/src/main/java/net/helenus/core/operation/InsertOperation.java @@ -21,6 +21,7 @@ import com.datastax.driver.core.querybuilder.Insert; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.common.base.Joiner; import java.util.*; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import net.helenus.core.AbstractSessionOperations; import net.helenus.core.Getter; @@ -257,7 +258,7 @@ public final class InsertOperation extends AbstractOperation { @@ -34,8 +37,10 @@ public abstract class Operation { AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, + long timeout, + TimeUnit units, boolean showValues, - boolean cached) { + boolean cached) throws TimeoutException { // Start recording in a Zipkin sub-span our execution time to perform this operation. Tracer tracer = session.getZipkinTracer(); @@ -46,19 +51,14 @@ public abstract class Operation { try { - if (span != null) { - span.name("cassandra"); - span.start(); - } - - Statement statement = options(buildStatement(cached)); - ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); - return futureResultSet.get(); - - } catch (InterruptedException | ExecutionException e) { - - throw new RuntimeException(e); + if (span != null) { + span.name("cassandra"); + span.start(); + } + Statement statement = options(buildStatement(cached)); + ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); + return futureResultSet.getUninterruptibly(timeout, units); } finally { if (span != null) { diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index ea23ca5..dcb916c 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -22,6 +22,7 @@ import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; import com.datastax.driver.core.querybuilder.Select.Selection; import com.datastax.driver.core.querybuilder.Select.Where; +import com.google.common.base.Joiner; import java.util.*; import java.util.function.Function; import java.util.stream.Stream; diff --git a/src/main/java/net/helenus/core/operation/UpdateOperation.java b/src/main/java/net/helenus/core/operation/UpdateOperation.java index b918bc6..cf84b47 100644 --- a/src/main/java/net/helenus/core/operation/UpdateOperation.java +++ b/src/main/java/net/helenus/core/operation/UpdateOperation.java @@ -21,6 +21,7 @@ import com.datastax.driver.core.querybuilder.BuiltStatement; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Update; import java.util.*; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import net.helenus.core.*; import net.helenus.core.reflect.HelenusPropertyNode; @@ -578,7 +579,7 @@ public final class UpdateOperation extends AbstractFilterOperation