Re-enable TimeoutException. Revert changes to add type to UnitOfWork as it breaks the use of subclasses.

This commit is contained in:
Greg Burd 2017-10-26 10:37:08 -04:00
parent c7e37acc5a
commit b04e033bf4
13 changed files with 149 additions and 90 deletions

View file

@ -86,11 +86,11 @@ public abstract class AbstractSessionOperations {
return execute(statement, null, timer, showValues);
}
public ResultSet execute(Statement statement, UnitOfWork<?> uow, boolean showValues) {
public ResultSet execute(Statement statement, UnitOfWork uow, boolean showValues) {
return execute(statement, uow, null, showValues);
}
public ResultSet execute(Statement statement, UnitOfWork<?> uow, Stopwatch timer, boolean showValues) {
public ResultSet execute(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {
return executeAsync(statement, uow, timer, showValues).getUninterruptibly();
}
@ -102,11 +102,11 @@ public abstract class AbstractSessionOperations {
return executeAsync(statement, null, timer, showValues);
}
public ResultSetFuture executeAsync(Statement statement, UnitOfWork<?> uow, boolean showValues) {
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, boolean showValues) {
return executeAsync(statement, uow, null, showValues);
}
public ResultSetFuture executeAsync(Statement statement, UnitOfWork<?> uow, Stopwatch timer, boolean showValues) {
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {
try {
logStatement(statement, showValues);
return currentSession().executeAsync(statement);

View file

@ -1,10 +1,26 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core;
public class ConflictingUnitOfWorkException extends Exception {
final UnitOfWork<?> uow;
final UnitOfWork uow;
ConflictingUnitOfWorkException(UnitOfWork<?> uow) {
ConflictingUnitOfWorkException(UnitOfWork uow) {
this.uow = uow;
}
}

View file

@ -5,10 +5,10 @@ import java.util.Objects;
public class PostCommitFunction<T, R> implements java.util.function.Function<T, R> {
private final UnitOfWork<?> uow;
private final UnitOfWork uow;
private final List<CommitThunk> postCommit;
PostCommitFunction(UnitOfWork<?> uow, List<CommitThunk> postCommit) {
PostCommitFunction(UnitOfWork uow, List<CommitThunk> postCommit) {
this.uow = uow;
this.postCommit = postCommit;
}

View file

@ -1,9 +1,26 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeoutException;
import net.helenus.core.ConflictingUnitOfWorkException;
@ -11,7 +28,7 @@ import net.helenus.core.ConflictingUnitOfWorkException;
@Target(ElementType.METHOD)
public @interface Retry {
Class<? extends Exception>[] on() default ConflictingUnitOfWorkException.class;
Class<? extends Exception>[] on() default {ConflictingUnitOfWorkException.class, TimeoutException.class};
int times() default 3;
}

View file

@ -1,3 +1,19 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.aspect;
import java.lang.reflect.Method;

View file

@ -16,6 +16,8 @@
package net.helenus.core.operation;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
@ -35,7 +37,7 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
return new PreparedOperation<E>(prepareStatement(), this);
}
public E sync() {// throws TimeoutException {
public E sync() throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits,
@ -46,7 +48,7 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
}
}
public E sync(UnitOfWork<?> uow) {// throws TimeoutException {
public E sync(UnitOfWork uow) throws TimeoutException {
if (uow == null)
return sync();
@ -63,23 +65,23 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
public CompletableFuture<E> async() {
return CompletableFuture.<E>supplyAsync(() -> {
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
public CompletableFuture<E> async(UnitOfWork<?> uow) {
public CompletableFuture<E> async(UnitOfWork uow) {
if (uow == null)
return async();
return CompletableFuture.<E>supplyAsync(() -> {
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
}

View file

@ -20,6 +20,8 @@ import static net.helenus.core.HelenusSession.deleted;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.PreparedStatement;
@ -59,7 +61,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
});
}
public Optional<E> sync() {// throws TimeoutException {
public Optional<E> sync() throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
Optional<E> result = Optional.empty();
@ -102,7 +104,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
}
}
public Optional<E> sync(UnitOfWork<?> uow) {// throws TimeoutException {
public Optional<E> sync(UnitOfWork<?> uow) throws TimeoutException {
if (uow == null)
return sync();
@ -163,19 +165,19 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
}
} else {
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true);
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true);
// Transform the query result set into the desired shape.
result = transform(resultSet);
}
// Transform the query result set into the desired shape.
result = transform(resultSet);
}
// If we have a result, it wasn't from the UOW cache, and we're caching things
// then we need to put this result into the cache for future requests to find.
if (updateCache && result.isPresent() && result.get() != deleted) {
cacheUpdate(uow, result.get(), getFacets());
}
// If we have a result, it wasn't from the UOW cache, and we're caching things
// then we need to put this result into the cache for future requests to find.
if (updateCache && result.isPresent() && result.get() != deleted) {
cacheUpdate(uow, result.get(), getFacets());
}
return result;
} finally {
@ -185,11 +187,11 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
public CompletableFuture<Optional<E>> async() {
return CompletableFuture.<Optional<E>>supplyAsync(() -> {
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
@ -197,11 +199,11 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
if (uow == null)
return async();
return CompletableFuture.<Optional<E>>supplyAsync(() -> {
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
}

View file

@ -20,6 +20,8 @@ import static net.helenus.core.HelenusSession.deleted;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import com.codahale.metrics.Timer;
@ -60,7 +62,7 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
});
}
public Stream<E> sync() {// throws TimeoutException {
public Stream<E> sync() throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
Stream<E> resultStream = null;
@ -109,7 +111,7 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
}
}
public Stream<E> sync(UnitOfWork<?> uow) {// throws TimeoutException {
public Stream<E> sync(UnitOfWork uow) throws TimeoutException {
if (uow == null)
return sync();
@ -162,26 +164,26 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
// Check to see if we fetched the object from the cache
if (resultStream == null) {
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true);
resultStream = transform(resultSet);
}
ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues, true);
resultStream = transform(resultSet);
}
// If we have a result and we're caching then we need to put it into the cache
// for future requests to find.
if (resultStream != null) {
List<E> again = new ArrayList<>();
List<Facet> facets = getFacets();
resultStream.forEach(result -> {
if (result != deleted) {
if (updateCache) {
cacheUpdate(uow, result, facets);
}
again.add(result);
}
});
resultStream = again.stream();
}
// If we have a result and we're caching then we need to put it into the cache
// for future requests to find.
if (resultStream != null) {
List<E> again = new ArrayList<>();
List<Facet> facets = getFacets();
resultStream.forEach(result -> {
if (result != deleted) {
if (updateCache) {
cacheUpdate(uow, result, facets);
}
again.add(result);
}
});
resultStream = again.stream();
}
return resultStream;
} finally {
@ -191,23 +193,23 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public CompletableFuture<Stream<E>> async() {
return CompletableFuture.<Stream<E>>supplyAsync(() -> {
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
public CompletableFuture<Stream<E>> async(UnitOfWork<?> uow) {
public CompletableFuture<Stream<E>> async(UnitOfWork uow) {
if (uow == null)
return async();
return CompletableFuture.<Stream<E>>supplyAsync(() -> {
// try {
return sync();
// } catch (TimeoutException ex) {
// throw new CompletionException(ex);
// }
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
}
}

View file

@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
@ -168,7 +169,7 @@ public final class DeleteOperation extends AbstractFilterOperation<ResultSet, De
}
@Override
public ResultSet sync() {// throws TimeoutException {
public ResultSet sync() throws TimeoutException {
ResultSet result = super.sync();
if (entity.isCacheable()) {
sessionOps.cacheEvict(bindFacetValues());
@ -182,7 +183,7 @@ public final class DeleteOperation extends AbstractFilterOperation<ResultSet, De
}
@Override
public ResultSet sync(UnitOfWork<?> uow) {// throws TimeoutException {
public ResultSet sync(UnitOfWork uow) throws TimeoutException {
if (uow == null) {
return sync();
}

View file

@ -16,6 +16,7 @@
package net.helenus.core.operation;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import com.datastax.driver.core.ResultSet;
@ -236,7 +237,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
}
@Override
public T sync() {// throws TimeoutException {
public T sync() throws TimeoutException {
T result = super.sync();
if (entity.isCacheable() && result != null) {
sessionOps.updateCache(result, entity.getFacets());
@ -245,7 +246,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
}
@Override
public T sync(UnitOfWork<?> uow) {// throws TimeoutException {
public T sync(UnitOfWork uow) throws TimeoutException {
if (uow == null) {
return sync();
}

View file

@ -18,6 +18,7 @@ package net.helenus.core.operation;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -87,8 +88,8 @@ public abstract class Operation<E> {
return query;
}
public ResultSet execute(AbstractSessionOperations session, UnitOfWork<?> uow, TraceContext traceContext,
long timeout, TimeUnit units, boolean showValues, boolean cached) { // throws TimeoutException {
public ResultSet execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, long timeout,
TimeUnit units, boolean showValues, boolean cached) throws TimeoutException {
// Start recording in a Zipkin sub-span our execution time to perform this
// operation.
@ -111,7 +112,7 @@ public abstract class Operation<E> {
ResultSetFuture futureResultSet = session.executeAsync(statement, uow, timer, showValues);
if (uow != null)
uow.recordCacheAndDatabaseOperationCount(0, 1);
ResultSet resultSet = futureResultSet.getUninterruptibly(); // TODO(gburd): (timeout, units);
ResultSet resultSet = futureResultSet.getUninterruptibly(timeout, units);
return resultSet;
} finally {
@ -129,7 +130,7 @@ public abstract class Operation<E> {
}
}
void log(Statement statement, UnitOfWork<?> uow, Stopwatch timer, boolean showValues) {
void log(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {
if (LOG.isInfoEnabled()) {
String uowString = "";
if (uow != null) {

View file

@ -16,6 +16,7 @@
package net.helenus.core.operation;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import com.datastax.driver.core.ResultSet;
@ -568,7 +569,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
}
@Override
public E sync() {// throws TimeoutException {
public E sync() throws TimeoutException {
E result = super.sync();
if (entity.isCacheable() && draft != null) {
sessionOps.updateCache(result, getFacets());
@ -577,7 +578,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
}
@Override
public E sync(UnitOfWork<?> uow) {// throws TimeoutException {
public E sync(UnitOfWork uow) throws TimeoutException {
if (uow == null) {
return sync();
}

View file

@ -55,12 +55,12 @@ public class MaterializedViewTest extends AbstractEmbeddedCassandraTest {
.get();
cyclist = session.dsl(Cyclist.class);
// try {
try {
session.insert(cyclist).value(cyclist::cid, UUID.randomUUID()).value(cyclist::age, 18)
.value(cyclist::birthday, dateFromString("1997-02-08")).value(cyclist::country, "Netherlands")
.value(cyclist::name, "Pascal EENKHOORN").sync();
// } catch (TimeoutException e) {
// }
} catch (TimeoutException e) {
}
}
@Test