Fix Zipkin tracing logic.

This commit is contained in:
Greg Burd 2017-08-08 15:28:51 -04:00
parent 71e84da3bd
commit b9600ac931
7 changed files with 89 additions and 24 deletions

38
NOTES Normal file
View file

@ -0,0 +1,38 @@
primitive types have default values, (e.g. boolean, int, ...) but primative wrapper classes do not and can be null (e.g. Boolean, Integer, ...)
create table wal {
id timeuuid,
follows timeuuid,
read <Counting Quotient Filter, Set<{keyspace, col, schema generation, timestamp}>>
write <Counting Quotient Filter, Set<{keyspace, col, schema generation, timestamp}>>
primary key (id, follows)
}
begin:
- insert into wal (timeuuid, parent timeuuid,
// NOTE: Update operations have no meaning when they only contain primary key components, so
// given that `properties` is ordered with the keys first if we find that the last element
// is either a partition key or clustering column then we know we should just skip this operation.
ColumnType ct = ((HelenusProperty) properties.toArray()[properties.size() - 1]).getColumnType();
if (ct != ColumnType.PARTITION_KEY && ct != ColumnType.CLUSTERING_COLUMN) {
return;
}
public Stream<E> sync() {
ListenableFuture<Stream<E>> future = async();
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String contents) {
//...process web site contents
}
@Override
public void onFailure(Throwable throwable) {
log.error("Exception in task", throwable);
}
});
}

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>net.helenus</groupId>
<artifactId>helenus-core</artifactId>
<version>2.0.7-SNAPSHOT</version>
<version>2.0.9-SNAPSHOT</version>
<packaging>jar</packaging>
<name>helenus</name>

View file

@ -21,8 +21,7 @@ import java.util.List;
import net.helenus.core.*;
public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterStreamOperation<E, O>>
extends
AbstractStreamOperation<E, O> {
extends AbstractStreamOperation<E, O> {
protected List<Filter<?>> filters = null;
protected List<Filter<?>> ifFilters = null;

View file

@ -30,8 +30,6 @@ import java.util.Optional;
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
extends AbstractStatementOperation<E, O> {
Span span;
public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
}
@ -53,17 +51,6 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
});
}
public AbstractOptionalOperation<E, O> withinSpan(Span span) {
if (span != null) {
Tracer tracer = this.sessionOps.getZipkinTracer();
if (tracer != null) {
this.span = span;
}
}
return this;
}
public Optional<E> sync() {
Tracer tracer = this.sessionOps.getZipkinTracer();
final Span cassandraSpan = (tracer != null && span != null) ? tracer.newChild(span.context()) : null;
@ -95,10 +82,11 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
new Function<ResultSet, Optional<E>>() {
@Override
public Optional<E> apply(ResultSet resultSet) {
Optional<E> result = transform(resultSet);
if (cassandraSpan != null) {
cassandraSpan.finish();
}
return transform(resultSet);
return result;
}
}, sessionOps.getExecutor());

View file

@ -15,6 +15,8 @@
*/
package net.helenus.core.operation;
import brave.Span;
import brave.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,6 +44,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
public abstract Statement buildStatement();
protected boolean showValues = true;
protected Span span;
private ConsistencyLevel consistencyLevel;
private ConsistencyLevel serialConsistencyLevel;
private RetryPolicy retryPolicy;
@ -212,7 +215,18 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
return statement;
}
public Statement statement() {
public O withinSpan(Span span) {
if (span != null) {
Tracer tracer = this.sessionOps.getZipkinTracer();
if (tracer != null) {
this.span = span;
}
}
return (O) this;
}
public Statement statement() {
return buildStatement();
}

View file

@ -17,6 +17,8 @@ package net.helenus.core.operation;
import java.util.stream.Stream;
import brave.Span;
import brave.Tracer;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
@ -50,19 +52,43 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
});
}
public Stream<E> sync() {
ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly();
return transform(resultSet);
public Stream<E> sync() {
Tracer tracer = this.sessionOps.getZipkinTracer();
final Span cassandraSpan = (tracer != null && span != null) ? tracer.newChild(span.context()) : null;
if (cassandraSpan != null) {
cassandraSpan.name("cassandra");
cassandraSpan.start();
}
ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly();
Stream<E> result = transform(resultSet);
if (cassandraSpan != null) {
cassandraSpan.finish();
}
return result;
}
public ListenableFuture<Stream<E>> async() {
Tracer tracer = this.sessionOps.getZipkinTracer();
final Span cassandraSpan = (tracer != null && span != null) ? tracer.newChild(span.context()) : null;
if (cassandraSpan != null) {
cassandraSpan.name("cassandra");
cassandraSpan.start();
}
ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues);
ListenableFuture<Stream<E>> future = Futures.transform(resultSetFuture,
new Function<ResultSet, Stream<E>>() {
@Override
public Stream<E> apply(ResultSet resultSet) {
return transform(resultSet);
}
Stream<E> result = transform(resultSet);
if (cassandraSpan != null) {
cassandraSpan.finish();
}
return result;
}
}, sessionOps.getExecutor());
return future;
}

View file

@ -166,7 +166,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
return this;
}
@Override
@Override
public BuiltStatement buildStatement() {
HelenusEntity entity = null;