From b9600ac9318e05bb9e8559db8d54fbd1d5df6465 Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Tue, 8 Aug 2017 15:28:51 -0400 Subject: [PATCH] Fix Zipkin tracing logic. --- NOTES | 38 +++++++++++++++++++ pom.xml | 2 +- .../AbstractFilterStreamOperation.java | 3 +- .../operation/AbstractOptionalOperation.java | 16 +------- .../operation/AbstractStatementOperation.java | 16 +++++++- .../operation/AbstractStreamOperation.java | 36 +++++++++++++++--- .../core/operation/SelectOperation.java | 2 +- 7 files changed, 89 insertions(+), 24 deletions(-) create mode 100644 NOTES diff --git a/NOTES b/NOTES new file mode 100644 index 0000000..cd9a7f6 --- /dev/null +++ b/NOTES @@ -0,0 +1,38 @@ +primitive types have default values, (e.g. boolean, int, ...) but primative wrapper classes do not and can be null (e.g. Boolean, Integer, ...) + +create table wal { + id timeuuid, + follows timeuuid, + read > + write > + primary key (id, follows) +} +begin: + - insert into wal (timeuuid, parent timeuuid, + + + + // NOTE: Update operations have no meaning when they only contain primary key components, so + // given that `properties` is ordered with the keys first if we find that the last element + // is either a partition key or clustering column then we know we should just skip this operation. + ColumnType ct = ((HelenusProperty) properties.toArray()[properties.size() - 1]).getColumnType(); + if (ct != ColumnType.PARTITION_KEY && ct != ColumnType.CLUSTERING_COLUMN) { + return; + } + + + + public Stream sync() { + ListenableFuture> future = async(); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(String contents) { + //...process web site contents + } + + @Override + public void onFailure(Throwable throwable) { + log.error("Exception in task", throwable); + } + }); + } diff --git a/pom.xml b/pom.xml index 7e5be89..1e215f8 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 net.helenus helenus-core - 2.0.7-SNAPSHOT + 2.0.9-SNAPSHOT jar helenus diff --git a/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java index 65e390d..118a0ab 100644 --- a/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java @@ -21,8 +21,7 @@ import java.util.List; import net.helenus.core.*; public abstract class AbstractFilterStreamOperation> - extends - AbstractStreamOperation { + extends AbstractStreamOperation { protected List> filters = null; protected List> ifFilters = null; diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index a63b292..5c56442 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -30,8 +30,6 @@ import java.util.Optional; public abstract class AbstractOptionalOperation> extends AbstractStatementOperation { - Span span; - public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) { super(sessionOperations); } @@ -53,17 +51,6 @@ public abstract class AbstractOptionalOperation withinSpan(Span span) { - if (span != null) { - Tracer tracer = this.sessionOps.getZipkinTracer(); - if (tracer != null) { - this.span = span; - } - } - - return this; - } - public Optional sync() { Tracer tracer = this.sessionOps.getZipkinTracer(); final Span cassandraSpan = (tracer != null && span != null) ? tracer.newChild(span.context()) : null; @@ -95,10 +82,11 @@ public abstract class AbstractOptionalOperation>() { @Override public Optional apply(ResultSet resultSet) { + Optional result = transform(resultSet); if (cassandraSpan != null) { cassandraSpan.finish(); } - return transform(resultSet); + return result; } }, sessionOps.getExecutor()); diff --git a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java index c05d877..1e4a70a 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java @@ -15,6 +15,8 @@ */ package net.helenus.core.operation; +import brave.Span; +import brave.Tracer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +44,7 @@ public abstract class AbstractStatementOperation sync() { - ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); - return transform(resultSet); + public Stream sync() { + Tracer tracer = this.sessionOps.getZipkinTracer(); + final Span cassandraSpan = (tracer != null && span != null) ? tracer.newChild(span.context()) : null; + if (cassandraSpan != null) { + cassandraSpan.name("cassandra"); + cassandraSpan.start(); + } + + ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); + Stream result = transform(resultSet); + + if (cassandraSpan != null) { + cassandraSpan.finish(); + } + + return result; } public ListenableFuture> async() { + Tracer tracer = this.sessionOps.getZipkinTracer(); + final Span cassandraSpan = (tracer != null && span != null) ? tracer.newChild(span.context()) : null; + if (cassandraSpan != null) { + cassandraSpan.name("cassandra"); + cassandraSpan.start(); + } + ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues); ListenableFuture> future = Futures.transform(resultSetFuture, new Function>() { @Override public Stream apply(ResultSet resultSet) { - return transform(resultSet); - } + Stream result = transform(resultSet); + if (cassandraSpan != null) { + cassandraSpan.finish(); + } + return result; + } }, sessionOps.getExecutor()); return future; } diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index 4df5583..5cd1d38 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -166,7 +166,7 @@ public final class SelectOperation extends AbstractFilterStreamOperation