From 071f6466aea72c037077874fe8f33c7b7be27d29 Mon Sep 17 00:00:00 2001 From: Albert Shift Date: Tue, 23 Jun 2015 08:34:20 -0700 Subject: [PATCH] use Scala Option and Stream in Scala future results --- .../operation/AbstractOptionalOperation.java | 46 ++++++++++++++----- .../operation/AbstractStreamOperation.java | 37 +++++++++++---- 2 files changed, 62 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/noorq/casser/core/operation/AbstractOptionalOperation.java b/src/main/java/com/noorq/casser/core/operation/AbstractOptionalOperation.java index 103a126..442577c 100644 --- a/src/main/java/com/noorq/casser/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/com/noorq/casser/core/operation/AbstractOptionalOperation.java @@ -17,6 +17,9 @@ package com.noorq.casser.core.operation; import java.util.Optional; +import scala.None; +import scala.Option; +import scala.Some; import scala.concurrent.Future; import com.datastax.driver.core.PreparedStatement; @@ -82,25 +85,46 @@ public abstract class AbstractOptionalOperation> future() { - return Scala.asFuture(async()); + + public ListenableFuture> asyncForScala() { + + ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues); + + ListenableFuture> future = Futures.transform(resultSetFuture, new Function>() { + + @Override + public Option apply(ResultSet resultSet) { + Optional optional = transform(resultSet); + if (optional.isPresent()) { + return new Some(optional.get()); + } + else { + return Option.empty(); + } + } + + }, sessionOps.getExecutor()); + + return future; + } + public Future> future() { + return Scala.asFuture(asyncForScala()); } - public Future, A>> future(A a) { - return Scala.asFuture(async(), a); + public Future, A>> future(A a) { + return Scala.asFuture(asyncForScala(), a); } - public Future, A, B>> future(A a, B b) { - return Scala.asFuture(async(), a, b); + public Future, A, B>> future(A a, B b) { + return Scala.asFuture(asyncForScala(), a, b); } - public Future, A, B, C>> future(A a, B b, C c) { - return Scala.asFuture(async(), a, b, c); + public Future, A, B, C>> future(A a, B b, C c) { + return Scala.asFuture(asyncForScala(), a, b, c); } - public Future, A, B, C, D>> future(A a, B b, C c, D d) { - return Scala.asFuture(async(), a, b, c, d); + public Future, A, B, C, D>> future(A a, B b, C c, D d) { + return Scala.asFuture(asyncForScala(), a, b, c, d); } } diff --git a/src/main/java/com/noorq/casser/core/operation/AbstractStreamOperation.java b/src/main/java/com/noorq/casser/core/operation/AbstractStreamOperation.java index 5516c57..ec5fbbc 100644 --- a/src/main/java/com/noorq/casser/core/operation/AbstractStreamOperation.java +++ b/src/main/java/com/noorq/casser/core/operation/AbstractStreamOperation.java @@ -83,24 +83,41 @@ public abstract class AbstractStreamOperation> future() { - return Scala.asFuture(async()); + public ListenableFuture> asyncForScala() { + + ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues); + + ListenableFuture> future = Futures.transform(resultSetFuture, new Function>() { + + @Override + public scala.collection.immutable.Stream apply(ResultSet resultSet) { + Stream stream = transform(resultSet); + return scala.collection.JavaConversions.asScalaIterator(stream.iterator()).toStream(); + } + + }, sessionOps.getExecutor()); + + return future; + } + + public Future> future() { + return Scala.asFuture(asyncForScala()); } - public Future, A>> future(A a) { - return Scala.asFuture(async(), a); + public Future, A>> future(A a) { + return Scala.asFuture(asyncForScala(), a); } - public Future, A, B>> future(A a, B b) { - return Scala.asFuture(async(), a, b); + public Future, A, B>> future(A a, B b) { + return Scala.asFuture(asyncForScala(), a, b); } - public Future, A, B, C>> future(A a, B b, C c) { - return Scala.asFuture(async(), a, b, c); + public Future, A, B, C>> future(A a, B b, C c) { + return Scala.asFuture(asyncForScala(), a, b, c); } - public Future, A, B, C, D>> future(A a, B b, C c, D d) { - return Scala.asFuture(async(), a, b, c, d); + public Future, A, B, C, D>> future(A a, B b, C c, D d) { + return Scala.asFuture(asyncForScala(), a, b, c, d); } }