Merge branch '2.0.8-SNAPSHOT' into develop

This commit is contained in:
Greg Burd 2017-08-08 15:30:08 -04:00
commit cbc246f1c0
6 changed files with 51 additions and 24 deletions

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>net.helenus</groupId>
<artifactId>helenus-core</artifactId>
<version>2.0.8-SNAPSHOT</version>
<version>2.0.9-SNAPSHOT</version>
<packaging>jar</packaging>
<name>helenus</name>

View file

@ -21,8 +21,7 @@ import java.util.List;
import net.helenus.core.*;
public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterStreamOperation<E, O>>
extends
AbstractStreamOperation<E, O> {
extends AbstractStreamOperation<E, O> {
protected List<Filter<?>> filters = null;
protected List<Filter<?>> ifFilters = null;

View file

@ -30,8 +30,6 @@ import java.util.Optional;
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
extends AbstractStatementOperation<E, O> {
Span span;
public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
}
@ -53,17 +51,6 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
});
}
public AbstractOptionalOperation<E, O> withinSpan(Span span) {
if (span != null) {
Tracer tracer = this.sessionOps.getZipkinTracer();
if (tracer != null) {
this.span = span;
}
}
return this;
}
public Optional<E> sync() {
Tracer tracer = this.sessionOps.getZipkinTracer();
final Span cassandraSpan = (tracer != null && span != null) ? tracer.newChild(span.context()) : null;
@ -95,10 +82,11 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
new Function<ResultSet, Optional<E>>() {
@Override
public Optional<E> apply(ResultSet resultSet) {
Optional<E> result = transform(resultSet);
if (cassandraSpan != null) {
cassandraSpan.finish();
}
return transform(resultSet);
return result;
}
}, sessionOps.getExecutor());

View file

@ -15,6 +15,8 @@
*/
package net.helenus.core.operation;
import brave.Span;
import brave.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,6 +44,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
public abstract Statement buildStatement();
protected boolean showValues = true;
protected Span span;
private ConsistencyLevel consistencyLevel;
private ConsistencyLevel serialConsistencyLevel;
private RetryPolicy retryPolicy;
@ -212,7 +215,18 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
return statement;
}
public Statement statement() {
public O withinSpan(Span span) {
if (span != null) {
Tracer tracer = this.sessionOps.getZipkinTracer();
if (tracer != null) {
this.span = span;
}
}
return (O) this;
}
public Statement statement() {
return buildStatement();
}

View file

@ -17,6 +17,8 @@ package net.helenus.core.operation;
import java.util.stream.Stream;
import brave.Span;
import brave.Tracer;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
@ -50,19 +52,43 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
});
}
public Stream<E> sync() {
ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly();
return transform(resultSet);
public Stream<E> sync() {
Tracer tracer = this.sessionOps.getZipkinTracer();
final Span cassandraSpan = (tracer != null && span != null) ? tracer.newChild(span.context()) : null;
if (cassandraSpan != null) {
cassandraSpan.name("cassandra");
cassandraSpan.start();
}
ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly();
Stream<E> result = transform(resultSet);
if (cassandraSpan != null) {
cassandraSpan.finish();
}
return result;
}
public ListenableFuture<Stream<E>> async() {
Tracer tracer = this.sessionOps.getZipkinTracer();
final Span cassandraSpan = (tracer != null && span != null) ? tracer.newChild(span.context()) : null;
if (cassandraSpan != null) {
cassandraSpan.name("cassandra");
cassandraSpan.start();
}
ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues);
ListenableFuture<Stream<E>> future = Futures.transform(resultSetFuture,
new Function<ResultSet, Stream<E>>() {
@Override
public Stream<E> apply(ResultSet resultSet) {
return transform(resultSet);
}
Stream<E> result = transform(resultSet);
if (cassandraSpan != null) {
cassandraSpan.finish();
}
return result;
}
}, sessionOps.getExecutor());
return future;
}

View file

@ -166,7 +166,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
return this;
}
@Override
@Override
public BuiltStatement buildStatement() {
HelenusEntity entity = null;