diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index 912f12f..3f8eba2 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -86,11 +86,11 @@ public abstract class AbstractSessionOperations { return execute(statement, null, timer, showValues); } - public ResultSet execute(Statement statement, UnitOfWork uow, boolean showValues) { + public ResultSet execute(Statement statement, UnitOfWork uow, boolean showValues) { return execute(statement, uow, null, showValues); } - public ResultSet execute(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) { + public ResultSet execute(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) { return executeAsync(statement, uow, timer, showValues).getUninterruptibly(); } @@ -102,11 +102,11 @@ public abstract class AbstractSessionOperations { return executeAsync(statement, null, timer, showValues); } - public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, boolean showValues) { + public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, boolean showValues) { return executeAsync(statement, uow, null, showValues); } - public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) { + public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) { try { logStatement(statement, showValues); return currentSession().executeAsync(statement); diff --git a/src/main/java/net/helenus/core/ConflictingUnitOfWorkException.java b/src/main/java/net/helenus/core/ConflictingUnitOfWorkException.java index bd52799..e8ab229 100644 --- a/src/main/java/net/helenus/core/ConflictingUnitOfWorkException.java +++ b/src/main/java/net/helenus/core/ConflictingUnitOfWorkException.java @@ -1,10 +1,26 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package net.helenus.core; public class ConflictingUnitOfWorkException extends Exception { - final UnitOfWork uow; + final UnitOfWork uow; - ConflictingUnitOfWorkException(UnitOfWork uow) { + ConflictingUnitOfWorkException(UnitOfWork uow) { this.uow = uow; } } diff --git a/src/main/java/net/helenus/core/PostCommitFunction.java b/src/main/java/net/helenus/core/PostCommitFunction.java index f45f033..a859608 100644 --- a/src/main/java/net/helenus/core/PostCommitFunction.java +++ b/src/main/java/net/helenus/core/PostCommitFunction.java @@ -5,10 +5,10 @@ import java.util.Objects; public class PostCommitFunction implements java.util.function.Function { - private final UnitOfWork uow; + private final UnitOfWork uow; private final List postCommit; - PostCommitFunction(UnitOfWork uow, List postCommit) { + PostCommitFunction(UnitOfWork uow, List postCommit) { this.uow = uow; this.postCommit = postCommit; } diff --git a/src/main/java/net/helenus/core/annotation/Retry.java b/src/main/java/net/helenus/core/annotation/Retry.java index 7db5b20..6ca6308 100644 --- a/src/main/java/net/helenus/core/annotation/Retry.java +++ b/src/main/java/net/helenus/core/annotation/Retry.java @@ -1,9 +1,26 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package net.helenus.core.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.util.concurrent.TimeoutException; import net.helenus.core.ConflictingUnitOfWorkException; @@ -11,7 +28,7 @@ import net.helenus.core.ConflictingUnitOfWorkException; @Target(ElementType.METHOD) public @interface Retry { - Class[] on() default ConflictingUnitOfWorkException.class; + Class[] on() default {ConflictingUnitOfWorkException.class, TimeoutException.class}; int times() default 3; } diff --git a/src/main/java/net/helenus/core/aspect/RetryAspect.java b/src/main/java/net/helenus/core/aspect/RetryAspect.java index 587d087..ed7d50e 100644 --- a/src/main/java/net/helenus/core/aspect/RetryAspect.java +++ b/src/main/java/net/helenus/core/aspect/RetryAspect.java @@ -1,3 +1,19 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package net.helenus.core.aspect; import java.lang.reflect.Method; diff --git a/src/main/java/net/helenus/core/operation/AbstractOperation.java b/src/main/java/net/helenus/core/operation/AbstractOperation.java index 9d834d8..27d7eaf 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOperation.java @@ -16,6 +16,8 @@ package net.helenus.core.operation; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; import com.codahale.metrics.Timer; import com.datastax.driver.core.ResultSet; @@ -35,7 +37,7 @@ public abstract class AbstractOperation> ex return new PreparedOperation(prepareStatement(), this); } - public E sync() {// throws TimeoutException { + public E sync() throws TimeoutException { final Timer.Context context = requestLatency.time(); try { ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, @@ -46,7 +48,7 @@ public abstract class AbstractOperation> ex } } - public E sync(UnitOfWork uow) {// throws TimeoutException { + public E sync(UnitOfWork uow) throws TimeoutException { if (uow == null) return sync(); @@ -63,23 +65,23 @@ public abstract class AbstractOperation> ex public CompletableFuture async() { return CompletableFuture.supplyAsync(() -> { - // try { - return sync(); - // } catch (TimeoutException ex) { - // throw new CompletionException(ex); - // } + try { + return sync(); + } catch (TimeoutException ex) { + throw new CompletionException(ex); + } }); } - public CompletableFuture async(UnitOfWork uow) { + public CompletableFuture async(UnitOfWork uow) { if (uow == null) return async(); return CompletableFuture.supplyAsync(() -> { - // try { - return sync(); - // } catch (TimeoutException ex) { - // throw new CompletionException(ex); - // } + try { + return sync(); + } catch (TimeoutException ex) { + throw new CompletionException(ex); + } }); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index dd7907c..1591404 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -20,6 +20,8 @@ import static net.helenus.core.HelenusSession.deleted; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; import com.codahale.metrics.Timer; import com.datastax.driver.core.PreparedStatement; @@ -59,7 +61,7 @@ public abstract class AbstractOptionalOperation sync() {// throws TimeoutException { + public Optional sync() throws TimeoutException { final Timer.Context context = requestLatency.time(); try { Optional result = Optional.empty(); @@ -102,7 +104,7 @@ public abstract class AbstractOptionalOperation sync(UnitOfWork uow) {// throws TimeoutException { + public Optional sync(UnitOfWork uow) throws TimeoutException { if (uow == null) return sync(); @@ -163,19 +165,19 @@ public abstract class AbstractOptionalOperation> async() { return CompletableFuture.>supplyAsync(() -> { - // try { - return sync(); - // } catch (TimeoutException ex) { - // throw new CompletionException(ex); - // } + try { + return sync(); + } catch (TimeoutException ex) { + throw new CompletionException(ex); + } }); } @@ -197,11 +199,11 @@ public abstract class AbstractOptionalOperation>supplyAsync(() -> { - // try { - return sync(); - // } catch (TimeoutException ex) { - // throw new CompletionException(ex); - // } + try { + return sync(); + } catch (TimeoutException ex) { + throw new CompletionException(ex); + } }); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java index ad66dbb..749f0ff 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java @@ -20,6 +20,8 @@ import static net.helenus.core.HelenusSession.deleted; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import com.codahale.metrics.Timer; @@ -60,7 +62,7 @@ public abstract class AbstractStreamOperation sync() {// throws TimeoutException { + public Stream sync() throws TimeoutException { final Timer.Context context = requestLatency.time(); try { Stream resultStream = null; @@ -109,7 +111,7 @@ public abstract class AbstractStreamOperation sync(UnitOfWork uow) {// throws TimeoutException { + public Stream sync(UnitOfWork uow) throws TimeoutException { if (uow == null) return sync(); @@ -162,26 +164,26 @@ public abstract class AbstractStreamOperation again = new ArrayList<>(); - List facets = getFacets(); - resultStream.forEach(result -> { - if (result != deleted) { - if (updateCache) { - cacheUpdate(uow, result, facets); - } - again.add(result); - } - }); - resultStream = again.stream(); - } + // If we have a result and we're caching then we need to put it into the cache + // for future requests to find. + if (resultStream != null) { + List again = new ArrayList<>(); + List facets = getFacets(); + resultStream.forEach(result -> { + if (result != deleted) { + if (updateCache) { + cacheUpdate(uow, result, facets); + } + again.add(result); + } + }); + resultStream = again.stream(); + } return resultStream; } finally { @@ -191,23 +193,23 @@ public abstract class AbstractStreamOperation> async() { return CompletableFuture.>supplyAsync(() -> { - // try { - return sync(); - // } catch (TimeoutException ex) { - // throw new CompletionException(ex); - // } + try { + return sync(); + } catch (TimeoutException ex) { + throw new CompletionException(ex); + } }); } - public CompletableFuture> async(UnitOfWork uow) { + public CompletableFuture> async(UnitOfWork uow) { if (uow == null) return async(); return CompletableFuture.>supplyAsync(() -> { - // try { - return sync(); - // } catch (TimeoutException ex) { - // throw new CompletionException(ex); - // } + try { + return sync(); + } catch (TimeoutException ex) { + throw new CompletionException(ex); + } }); } } diff --git a/src/main/java/net/helenus/core/operation/DeleteOperation.java b/src/main/java/net/helenus/core/operation/DeleteOperation.java index e180ff0..9375a30 100644 --- a/src/main/java/net/helenus/core/operation/DeleteOperation.java +++ b/src/main/java/net/helenus/core/operation/DeleteOperation.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.querybuilder.BuiltStatement; @@ -168,7 +169,7 @@ public final class DeleteOperation extends AbstractFilterOperation uow) {// throws TimeoutException { + public ResultSet sync(UnitOfWork uow) throws TimeoutException { if (uow == null) { return sync(); } diff --git a/src/main/java/net/helenus/core/operation/InsertOperation.java b/src/main/java/net/helenus/core/operation/InsertOperation.java index 462449c..89fb3c3 100644 --- a/src/main/java/net/helenus/core/operation/InsertOperation.java +++ b/src/main/java/net/helenus/core/operation/InsertOperation.java @@ -16,6 +16,7 @@ package net.helenus.core.operation; import java.util.*; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import com.datastax.driver.core.ResultSet; @@ -236,7 +237,7 @@ public final class InsertOperation extends AbstractOperation extends AbstractOperation uow) {// throws TimeoutException { + public T sync(UnitOfWork uow) throws TimeoutException { if (uow == null) { return sync(); } diff --git a/src/main/java/net/helenus/core/operation/Operation.java b/src/main/java/net/helenus/core/operation/Operation.java index 61e12c4..00262bb 100644 --- a/src/main/java/net/helenus/core/operation/Operation.java +++ b/src/main/java/net/helenus/core/operation/Operation.java @@ -18,6 +18,7 @@ package net.helenus.core.operation; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,8 +88,8 @@ public abstract class Operation { return query; } - public ResultSet execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, - long timeout, TimeUnit units, boolean showValues, boolean cached) { // throws TimeoutException { + public ResultSet execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, long timeout, + TimeUnit units, boolean showValues, boolean cached) throws TimeoutException { // Start recording in a Zipkin sub-span our execution time to perform this // operation. @@ -111,7 +112,7 @@ public abstract class Operation { ResultSetFuture futureResultSet = session.executeAsync(statement, uow, timer, showValues); if (uow != null) uow.recordCacheAndDatabaseOperationCount(0, 1); - ResultSet resultSet = futureResultSet.getUninterruptibly(); // TODO(gburd): (timeout, units); + ResultSet resultSet = futureResultSet.getUninterruptibly(timeout, units); return resultSet; } finally { @@ -129,7 +130,7 @@ public abstract class Operation { } } - void log(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) { + void log(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) { if (LOG.isInfoEnabled()) { String uowString = ""; if (uow != null) { diff --git a/src/main/java/net/helenus/core/operation/UpdateOperation.java b/src/main/java/net/helenus/core/operation/UpdateOperation.java index dfdc955..67d4e34 100644 --- a/src/main/java/net/helenus/core/operation/UpdateOperation.java +++ b/src/main/java/net/helenus/core/operation/UpdateOperation.java @@ -16,6 +16,7 @@ package net.helenus.core.operation; import java.util.*; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import com.datastax.driver.core.ResultSet; @@ -568,7 +569,7 @@ public final class UpdateOperation extends AbstractFilterOperation extends AbstractFilterOperation uow) {// throws TimeoutException { + public E sync(UnitOfWork uow) throws TimeoutException { if (uow == null) { return sync(); } diff --git a/src/test/java/net/helenus/test/integration/core/views/MaterializedViewTest.java b/src/test/java/net/helenus/test/integration/core/views/MaterializedViewTest.java index 7cbc247..8d4a431 100644 --- a/src/test/java/net/helenus/test/integration/core/views/MaterializedViewTest.java +++ b/src/test/java/net/helenus/test/integration/core/views/MaterializedViewTest.java @@ -55,12 +55,12 @@ public class MaterializedViewTest extends AbstractEmbeddedCassandraTest { .get(); cyclist = session.dsl(Cyclist.class); - // try { + try { session.insert(cyclist).value(cyclist::cid, UUID.randomUUID()).value(cyclist::age, 18) .value(cyclist::birthday, dateFromString("1997-02-08")).value(cyclist::country, "Netherlands") .value(cyclist::name, "Pascal EENKHOORN").sync(); - // } catch (TimeoutException e) { - // } + } catch (TimeoutException e) { + } } @Test