diff --git a/src/main/java/net/helenus/core/operation/AbstractOperation.java b/src/main/java/net/helenus/core/operation/AbstractOperation.java index 4bb9bd1..c868fcc 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOperation.java @@ -20,7 +20,7 @@ import java.util.concurrent.CompletableFuture; import net.helenus.core.AbstractSessionOperations; public abstract class AbstractOperation> - extends AbstractStatementOperation { + extends AbstractStatementOperation implements Transformational { public abstract E transform(ResultSet resultSet); @@ -41,16 +41,12 @@ public abstract class AbstractOperation> } public E sync() { - ResultSet resultSet = - sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); - E result = transform(resultSet); - if (cacheable()) { - sessionOps.cache(getCacheKey(), result); - } - return result; + return Executioner.INSTANCE.sync(sessionOps, options(buildStatement()), + traceContext, this, showValues); } public CompletableFuture async() { - return CompletableFuture.supplyAsync(this::sync); + return Executioner.INSTANCE.async(sessionOps, options(buildStatement()), + traceContext, this, showValues); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index c772c6b..a2c5d93 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -15,19 +15,18 @@ */ package net.helenus.core.operation; -import brave.Span; -import brave.Tracer; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import java.util.Optional; import net.helenus.core.AbstractSessionOperations; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + public abstract class AbstractOptionalOperation> - extends AbstractStatementOperation { + extends AbstractStatementOperation implements Transformational> { public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) { super(sessionOperations); @@ -52,51 +51,13 @@ public abstract class AbstractOptionalOperation sync() { - Tracer tracer = this.sessionOps.getZipkinTracer(); - final Span cassandraSpan = - (tracer != null && traceContext != null) ? tracer.newChild(traceContext) : null; - if (cassandraSpan != null) { - cassandraSpan.name("cassandra"); - cassandraSpan.start(); - } - ResultSet resultSet = - sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); - Optional result = transform(resultSet); - - if (cassandraSpan != null) { - cassandraSpan.finish(); - } - - return result; + return Executioner.INSTANCE.>sync(sessionOps, options(buildStatement()), + traceContext, this, showValues); } - public ListenableFuture> async() { - final Tracer tracer = this.sessionOps.getZipkinTracer(); - final Span cassandraSpan = - (tracer != null && traceContext != null) ? tracer.newChild(traceContext) : null; - if (cassandraSpan != null) { - cassandraSpan.name("cassandra"); - cassandraSpan.start(); - } - - ResultSetFuture resultSetFuture = - sessionOps.executeAsync(options(buildStatement()), showValues); - ListenableFuture> future = - Futures.transform( - resultSetFuture, - new Function>() { - @Override - public Optional apply(ResultSet resultSet) { - Optional result = transform(resultSet); - if (cassandraSpan != null) { - cassandraSpan.finish(); - } - return result; - } - }, - sessionOps.getExecutor()); - - return future; + public CompletableFuture> async() { + return Executioner.INSTANCE.>async(sessionOps, options(buildStatement()), + traceContext, this, showValues); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java index 8ab0736..a261cb6 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java @@ -15,19 +15,18 @@ */ package net.helenus.core.operation; -import brave.Span; -import brave.Tracer; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; + +import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import net.helenus.core.AbstractSessionOperations; public abstract class AbstractStreamOperation> - extends AbstractStatementOperation { + extends AbstractStatementOperation implements Transformational> { public AbstractStreamOperation(AbstractSessionOperations sessionOperations) { super(sessionOperations); @@ -52,50 +51,12 @@ public abstract class AbstractStreamOperation sync() { - Tracer tracer = this.sessionOps.getZipkinTracer(); - final Span cassandraSpan = - (tracer != null && traceContext != null) ? tracer.newChild(traceContext) : null; - if (cassandraSpan != null) { - cassandraSpan.name("cassandra"); - cassandraSpan.start(); - } - - ResultSet resultSet = - sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); - Stream result = transform(resultSet); - - if (cassandraSpan != null) { - cassandraSpan.finish(); - } - - return result; + return Executioner.INSTANCE.>sync(sessionOps, options(buildStatement()), + traceContext, this, showValues); } - public ListenableFuture> async() { - Tracer tracer = this.sessionOps.getZipkinTracer(); - final Span cassandraSpan = - (tracer != null && traceContext != null) ? tracer.newChild(traceContext) : null; - if (cassandraSpan != null) { - cassandraSpan.name("cassandra"); - cassandraSpan.start(); - } - - ResultSetFuture resultSetFuture = - sessionOps.executeAsync(options(buildStatement()), showValues); - ListenableFuture> future = - Futures.transform( - resultSetFuture, - new Function>() { - @Override - public Stream apply(ResultSet resultSet) { - Stream result = transform(resultSet); - if (cassandraSpan != null) { - cassandraSpan.finish(); - } - return result; - } - }, - sessionOps.getExecutor()); - return future; + public CompletableFuture> async() { + return Executioner.INSTANCE.>async(sessionOps, options(buildStatement()), + traceContext, this, showValues); } } diff --git a/src/main/java/net/helenus/core/operation/Executioner.java b/src/main/java/net/helenus/core/operation/Executioner.java new file mode 100644 index 0000000..a51d5b6 --- /dev/null +++ b/src/main/java/net/helenus/core/operation/Executioner.java @@ -0,0 +1,61 @@ +package net.helenus.core.operation; + +import brave.Span; +import brave.Tracer; +import brave.propagation.TraceContext; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Statement; +import net.helenus.core.AbstractSessionOperations; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public enum Executioner { + INSTANCE; + + E sync( + AbstractSessionOperations session, + Statement statement, + TraceContext traceContext, + Transformational delegate, + boolean showValues) { + try { + return this.async(session, statement, traceContext, delegate, showValues).get(); + } catch (InterruptedException | ExecutionException e) { + return null; + } + } + + public CompletableFuture async( + AbstractSessionOperations session, + Statement statement, + TraceContext traceContext, + Transformational delegate, + boolean showValues) { + ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); + + return CompletableFuture.supplyAsync( + () -> { + Tracer tracer = session.getZipkinTracer(); + final Span span = + (tracer != null && traceContext != null) ? tracer.newChild(traceContext) : null; + try { + if (span != null) { + span.name("cassandra"); + span.start(); + } + ResultSet resultSet = futureResultSet.get(); // TODO: timeout + E result = delegate.transform(resultSet); + + return result; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } finally { + if (span != null) { + span.finish(); + } + } + }); + } +} diff --git a/src/main/java/net/helenus/core/operation/Transformational.java b/src/main/java/net/helenus/core/operation/Transformational.java new file mode 100644 index 0000000..a4449c2 --- /dev/null +++ b/src/main/java/net/helenus/core/operation/Transformational.java @@ -0,0 +1,7 @@ +package net.helenus.core.operation; + +import com.datastax.driver.core.ResultSet; + +public interface Transformational { + E transform(ResultSet resultSet); +} diff --git a/src/test/java/net/helenus/test/integration/core/tuple/InnerTupleTest.java b/src/test/java/net/helenus/test/integration/core/tuple/InnerTupleTest.java index 631f140..ac9b613 100644 --- a/src/test/java/net/helenus/test/integration/core/tuple/InnerTupleTest.java +++ b/src/test/java/net/helenus/test/integration/core/tuple/InnerTupleTest.java @@ -15,8 +15,7 @@ */ package net.helenus.test.integration.core.tuple; - - +import net.helenus.core.Helenus; import net.helenus.core.HelenusSession; import net.helenus.core.Query; import net.helenus.test.integration.build.AbstractEmbeddedCassandraTest; @@ -24,115 +23,121 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import net.helenus.core.Helenus; - public class InnerTupleTest extends AbstractEmbeddedCassandraTest { - static PhotoAlbum photoAlbum; + static PhotoAlbum photoAlbum; - static HelenusSession session; + static HelenusSession session; - @BeforeClass - public static void beforeTest() { - session = Helenus.init(getSession()).showCql().add(PhotoAlbum.class).autoCreateDrop().get(); - photoAlbum = Helenus.dsl(PhotoAlbum.class, session.getMetadata()); - } + @BeforeClass + public static void beforeTest() { + session = Helenus.init(getSession()).showCql().add(PhotoAlbum.class).autoCreateDrop().get(); + photoAlbum = Helenus.dsl(PhotoAlbum.class, session.getMetadata()); + } - @Test - public void testPrint() { - System.out.println(photoAlbum); - } + @Test + public void testPrint() { + System.out.println(photoAlbum); + } + @Test + public void testCruid() { - @Test - public void testCruid() { + Photo photo = + new Photo() { - Photo photo = new Photo() { + @Override + public byte[] blob() { + return "jpeg".getBytes(); + } + }; - @Override - public byte[] blob() { - return "jpeg".getBytes(); - } + PhotoFolder folder = + new PhotoFolder() { - }; + @Override + public String name() { + return "first"; + } - PhotoFolder folder = new PhotoFolder() { + @Override + public Photo photo() { + return photo; + } + }; - @Override - public String name() { - return "first"; - } + // CREATE (C) - @Override - public Photo photo() { - return photo; - } + session.insert().value(photoAlbum::id, 123).value(photoAlbum::folder, folder).sync(); - }; + // READ (R) - // CREATE (C) + PhotoFolder actual = + session + .select(photoAlbum::folder) + .where(photoAlbum::id, Query.eq(123)) + .sync() + .findFirst() + .get() + ._1; - session.insert() - .value(photoAlbum::id, 123) - .value(photoAlbum::folder, folder) - .sync(); + Assert.assertEquals(folder.name(), actual.name()); - // READ (R) + // UPDATE (U) - PhotoFolder actual = session.select(photoAlbum::folder).where(photoAlbum::id, Query.eq(123)).sync().findFirst().get()._1; + // unfortunately this is not working right now in Cassandra, can not update a single column in tuple :( + //session.update() + // .set(photoAlbum.folder().photo()::blob, "Helenus".getBytes()) + // .where(photoAlbum::id, eq(123)) + // .sync(); - Assert.assertEquals(folder.name(), actual.name()); + PhotoFolder expected = + new PhotoFolder() { - // UPDATE (U) + @Override + public String name() { + return "seconds"; + } - // unfortunately this is not working right now in Cassandra, can not update a single column in tuple :( - //session.update() - // .set(photoAlbum.folder().photo()::blob, "Helenus".getBytes()) - // .where(photoAlbum::id, eq(123)) - // .sync(); + @Override + public Photo photo() { + return photo; + } + }; - PhotoFolder expected = new PhotoFolder() { + session.update().set(photoAlbum::folder, expected).where(photoAlbum::id, Query.eq(123)).sync(); - @Override - public String name() { - return "seconds"; - } + actual = + session + .select(photoAlbum::folder) + .where(photoAlbum::id, Query.eq(123)) + .sync() + .findFirst() + .get() + ._1; - @Override - public Photo photo() { - return photo; - } + Assert.assertEquals(expected.name(), actual.name()); - }; + // INSERT (I) + // let's insert null ;) - session.update() - .set(photoAlbum::folder, expected) - .where(photoAlbum::id, Query.eq(123)) - .sync(); + session.update().set(photoAlbum::folder, null).where(photoAlbum::id, Query.eq(123)).sync(); - actual = session.select(photoAlbum::folder).where(photoAlbum::id, Query.eq(123)).sync().findFirst().get()._1; - - Assert.assertEquals(expected.name(), actual.name()); - - // INSERT (I) - // let's insert null ;) - - session.update() - .set(photoAlbum::folder, null) - .where(photoAlbum::id, Query.eq(123)) - .sync(); - - actual = session.select(photoAlbum::folder).where(photoAlbum::id, Query.eq(123)).sync().findFirst().get()._1; - Assert.assertNull(actual); - - // DELETE (D) - session.delete().where(photoAlbum::id, Query.eq(123)).sync(); - - long cnt = session.select(photoAlbum::folder).where(photoAlbum::id, Query.eq(123)).sync().count(); - Assert.assertEquals(0, cnt); - - - } + actual = + session + .select(photoAlbum::folder) + .where(photoAlbum::id, Query.eq(123)) + .sync() + .findFirst() + .get() + ._1; + Assert.assertNull(actual); + // DELETE (D) + session.delete().where(photoAlbum::id, Query.eq(123)).sync(); + long cnt = + session.select(photoAlbum::folder).where(photoAlbum::id, Query.eq(123)).sync().count(); + Assert.assertEquals(0, cnt); + } }