Funnel all paths to the database through one single method implementation (the 'Executioner') so as to ensure all calls into Cassandra are wrapped in a common way (traced, measured, cached, etc.) in a single place.

This commit is contained in:
Greg Burd 2017-08-17 12:05:50 -04:00
parent 6ad99fc459
commit 142688a215
6 changed files with 179 additions and 188 deletions

View file

@ -20,7 +20,7 @@ import java.util.concurrent.CompletableFuture;
import net.helenus.core.AbstractSessionOperations; import net.helenus.core.AbstractSessionOperations;
public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
extends AbstractStatementOperation<E, O> { extends AbstractStatementOperation<E, O> implements Transformational<E> {
public abstract E transform(ResultSet resultSet); public abstract E transform(ResultSet resultSet);
@ -41,16 +41,12 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
} }
public E sync() { public E sync() {
ResultSet resultSet = return Executioner.INSTANCE.<E>sync(sessionOps, options(buildStatement()),
sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); traceContext, this, showValues);
E result = transform(resultSet);
if (cacheable()) {
sessionOps.cache(getCacheKey(), result);
}
return result;
} }
public CompletableFuture<E> async() { public CompletableFuture<E> async() {
return CompletableFuture.supplyAsync(this::sync); return Executioner.INSTANCE.<E>async(sessionOps, options(buildStatement()),
traceContext, this, showValues);
} }
} }

View file

@ -15,19 +15,18 @@
*/ */
package net.helenus.core.operation; package net.helenus.core.operation;
import brave.Span;
import brave.Tracer;
import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import java.util.Optional;
import net.helenus.core.AbstractSessionOperations; import net.helenus.core.AbstractSessionOperations;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>> public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
extends AbstractStatementOperation<E, O> { extends AbstractStatementOperation<E, O> implements Transformational<Optional<E>> {
public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) { public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations); super(sessionOperations);
@ -52,51 +51,13 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
} }
public Optional<E> sync() { public Optional<E> sync() {
Tracer tracer = this.sessionOps.getZipkinTracer();
final Span cassandraSpan =
(tracer != null && traceContext != null) ? tracer.newChild(traceContext) : null;
if (cassandraSpan != null) {
cassandraSpan.name("cassandra");
cassandraSpan.start();
}
ResultSet resultSet = return Executioner.INSTANCE.<Optional<E>>sync(sessionOps, options(buildStatement()),
sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); traceContext, this, showValues);
Optional<E> result = transform(resultSet);
if (cassandraSpan != null) {
cassandraSpan.finish();
}
return result;
} }
public ListenableFuture<Optional<E>> async() { public CompletableFuture<Optional<E>> async() {
final Tracer tracer = this.sessionOps.getZipkinTracer(); return Executioner.INSTANCE.<Optional<E>>async(sessionOps, options(buildStatement()),
final Span cassandraSpan = traceContext, this, showValues);
(tracer != null && traceContext != null) ? tracer.newChild(traceContext) : null;
if (cassandraSpan != null) {
cassandraSpan.name("cassandra");
cassandraSpan.start();
}
ResultSetFuture resultSetFuture =
sessionOps.executeAsync(options(buildStatement()), showValues);
ListenableFuture<Optional<E>> future =
Futures.transform(
resultSetFuture,
new Function<ResultSet, Optional<E>>() {
@Override
public Optional<E> apply(ResultSet resultSet) {
Optional<E> result = transform(resultSet);
if (cassandraSpan != null) {
cassandraSpan.finish();
}
return result;
}
},
sessionOps.getExecutor());
return future;
} }
} }

View file

@ -15,19 +15,18 @@
*/ */
package net.helenus.core.operation; package net.helenus.core.operation;
import brave.Span;
import brave.Tracer;
import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream; import java.util.stream.Stream;
import net.helenus.core.AbstractSessionOperations; import net.helenus.core.AbstractSessionOperations;
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>> public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
extends AbstractStatementOperation<E, O> { extends AbstractStatementOperation<E, O> implements Transformational<Stream<E>> {
public AbstractStreamOperation(AbstractSessionOperations sessionOperations) { public AbstractStreamOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations); super(sessionOperations);
@ -52,50 +51,12 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
} }
public Stream<E> sync() { public Stream<E> sync() {
Tracer tracer = this.sessionOps.getZipkinTracer(); return Executioner.INSTANCE.<Stream<E>>sync(sessionOps, options(buildStatement()),
final Span cassandraSpan = traceContext, this, showValues);
(tracer != null && traceContext != null) ? tracer.newChild(traceContext) : null;
if (cassandraSpan != null) {
cassandraSpan.name("cassandra");
cassandraSpan.start();
}
ResultSet resultSet =
sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly();
Stream<E> result = transform(resultSet);
if (cassandraSpan != null) {
cassandraSpan.finish();
}
return result;
} }
public ListenableFuture<Stream<E>> async() { public CompletableFuture<Stream<E>> async() {
Tracer tracer = this.sessionOps.getZipkinTracer(); return Executioner.INSTANCE.<Stream<E>>async(sessionOps, options(buildStatement()),
final Span cassandraSpan = traceContext, this, showValues);
(tracer != null && traceContext != null) ? tracer.newChild(traceContext) : null;
if (cassandraSpan != null) {
cassandraSpan.name("cassandra");
cassandraSpan.start();
}
ResultSetFuture resultSetFuture =
sessionOps.executeAsync(options(buildStatement()), showValues);
ListenableFuture<Stream<E>> future =
Futures.transform(
resultSetFuture,
new Function<ResultSet, Stream<E>>() {
@Override
public Stream<E> apply(ResultSet resultSet) {
Stream<E> result = transform(resultSet);
if (cassandraSpan != null) {
cassandraSpan.finish();
}
return result;
}
},
sessionOps.getExecutor());
return future;
} }
} }

View file

@ -0,0 +1,61 @@
package net.helenus.core.operation;
import brave.Span;
import brave.Tracer;
import brave.propagation.TraceContext;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import net.helenus.core.AbstractSessionOperations;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public enum Executioner {
INSTANCE;
<E> E sync(
AbstractSessionOperations session,
Statement statement,
TraceContext traceContext,
Transformational<E> delegate,
boolean showValues) {
try {
return this.<E>async(session, statement, traceContext, delegate, showValues).get();
} catch (InterruptedException | ExecutionException e) {
return null;
}
}
public <E> CompletableFuture<E> async(
AbstractSessionOperations session,
Statement statement,
TraceContext traceContext,
Transformational<E> delegate,
boolean showValues) {
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return CompletableFuture.supplyAsync(
() -> {
Tracer tracer = session.getZipkinTracer();
final Span span =
(tracer != null && traceContext != null) ? tracer.newChild(traceContext) : null;
try {
if (span != null) {
span.name("cassandra");
span.start();
}
ResultSet resultSet = futureResultSet.get(); // TODO: timeout
E result = delegate.transform(resultSet);
return result;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (span != null) {
span.finish();
}
}
});
}
}

View file

@ -0,0 +1,7 @@
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
public interface Transformational<E> {
E transform(ResultSet resultSet);
}

View file

@ -15,8 +15,7 @@
*/ */
package net.helenus.test.integration.core.tuple; package net.helenus.test.integration.core.tuple;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession; import net.helenus.core.HelenusSession;
import net.helenus.core.Query; import net.helenus.core.Query;
import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest; import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest;
@ -24,115 +23,121 @@ import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import net.helenus.core.Helenus;
public class InnerTupleTest extends AbstractEmbeddedCassandraTest { public class InnerTupleTest extends AbstractEmbeddedCassandraTest {
static PhotoAlbum photoAlbum; static PhotoAlbum photoAlbum;
static HelenusSession session; static HelenusSession session;
@BeforeClass @BeforeClass
public static void beforeTest() { public static void beforeTest() {
session = Helenus.init(getSession()).showCql().add(PhotoAlbum.class).autoCreateDrop().get(); session = Helenus.init(getSession()).showCql().add(PhotoAlbum.class).autoCreateDrop().get();
photoAlbum = Helenus.dsl(PhotoAlbum.class, session.getMetadata()); photoAlbum = Helenus.dsl(PhotoAlbum.class, session.getMetadata());
} }
@Test @Test
public void testPrint() { public void testPrint() {
System.out.println(photoAlbum); System.out.println(photoAlbum);
} }
@Test
public void testCruid() {
@Test Photo photo =
public void testCruid() { new Photo() {
Photo photo = new Photo() { @Override
public byte[] blob() {
return "jpeg".getBytes();
}
};
@Override PhotoFolder folder =
public byte[] blob() { new PhotoFolder() {
return "jpeg".getBytes();
}
}; @Override
public String name() {
return "first";
}
PhotoFolder folder = new PhotoFolder() { @Override
public Photo photo() {
return photo;
}
};
@Override // CREATE (C)
public String name() {
return "first";
}
@Override session.insert().value(photoAlbum::id, 123).value(photoAlbum::folder, folder).sync();
public Photo photo() {
return photo;
}
}; // READ (R)
// CREATE (C) PhotoFolder actual =
session
.select(photoAlbum::folder)
.where(photoAlbum::id, Query.eq(123))
.sync()
.findFirst()
.get()
._1;
session.insert() Assert.assertEquals(folder.name(), actual.name());
.value(photoAlbum::id, 123)
.value(photoAlbum::folder, folder)
.sync();
// READ (R) // UPDATE (U)
PhotoFolder actual = session.select(photoAlbum::folder).where(photoAlbum::id, Query.eq(123)).sync().findFirst().get()._1; // unfortunately this is not working right now in Cassandra, can not update a single column in tuple :(
//session.update()
// .set(photoAlbum.folder().photo()::blob, "Helenus".getBytes())
// .where(photoAlbum::id, eq(123))
// .sync();
Assert.assertEquals(folder.name(), actual.name()); PhotoFolder expected =
new PhotoFolder() {
// UPDATE (U) @Override
public String name() {
return "seconds";
}
// unfortunately this is not working right now in Cassandra, can not update a single column in tuple :( @Override
//session.update() public Photo photo() {
// .set(photoAlbum.folder().photo()::blob, "Helenus".getBytes()) return photo;
// .where(photoAlbum::id, eq(123)) }
// .sync(); };
PhotoFolder expected = new PhotoFolder() { session.update().set(photoAlbum::folder, expected).where(photoAlbum::id, Query.eq(123)).sync();
@Override actual =
public String name() { session
return "seconds"; .select(photoAlbum::folder)
} .where(photoAlbum::id, Query.eq(123))
.sync()
.findFirst()
.get()
._1;
@Override Assert.assertEquals(expected.name(), actual.name());
public Photo photo() {
return photo;
}
}; // INSERT (I)
// let's insert null ;)
session.update() session.update().set(photoAlbum::folder, null).where(photoAlbum::id, Query.eq(123)).sync();
.set(photoAlbum::folder, expected)
.where(photoAlbum::id, Query.eq(123))
.sync();
actual = session.select(photoAlbum::folder).where(photoAlbum::id, Query.eq(123)).sync().findFirst().get()._1; actual =
session
Assert.assertEquals(expected.name(), actual.name()); .select(photoAlbum::folder)
.where(photoAlbum::id, Query.eq(123))
// INSERT (I) .sync()
// let's insert null ;) .findFirst()
.get()
session.update() ._1;
.set(photoAlbum::folder, null) Assert.assertNull(actual);
.where(photoAlbum::id, Query.eq(123))
.sync();
actual = session.select(photoAlbum::folder).where(photoAlbum::id, Query.eq(123)).sync().findFirst().get()._1;
Assert.assertNull(actual);
// DELETE (D)
session.delete().where(photoAlbum::id, Query.eq(123)).sync();
long cnt = session.select(photoAlbum::folder).where(photoAlbum::id, Query.eq(123)).sync().count();
Assert.assertEquals(0, cnt);
}
// DELETE (D)
session.delete().where(photoAlbum::id, Query.eq(123)).sync();
long cnt =
session.select(photoAlbum::folder).where(photoAlbum::id, Query.eq(123)).sync().count();
Assert.assertEquals(0, cnt);
}
} }