diff --git a/pom.xml b/pom.xml index f3cd2f0..46c824e 100644 --- a/pom.xml +++ b/pom.xml @@ -88,11 +88,16 @@ - + + org.scala-lang + scala-library + 2.11.6 + + com.datastax.cassandra cassandra-driver-core diff --git a/src/main/java/com/noorq/casser/core/operation/AbstractOperation.java b/src/main/java/com/noorq/casser/core/operation/AbstractOperation.java index a266d2f..8492094 100644 --- a/src/main/java/com/noorq/casser/core/operation/AbstractOperation.java +++ b/src/main/java/com/noorq/casser/core/operation/AbstractOperation.java @@ -15,6 +15,8 @@ */ package com.noorq.casser.core.operation; +import scala.concurrent.Future; + import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; @@ -22,6 +24,7 @@ import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.noorq.casser.core.AbstractSessionOperations; +import com.noorq.casser.support.Scala; public abstract class AbstractOperation> extends AbstractStatementOperation { @@ -50,6 +53,10 @@ public abstract class AbstractOperation> ex } + public Future> prepareFuture() { + return Scala.asFuture(prepareAsync()); + } + public E sync() { ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); @@ -73,5 +80,8 @@ public abstract class AbstractOperation> ex return future; } + public Future future() { + return Scala.asFuture(async()); + } } diff --git a/src/main/java/com/noorq/casser/core/operation/AbstractStatementOperation.java b/src/main/java/com/noorq/casser/core/operation/AbstractStatementOperation.java index 5dbcf43..73c7f9a 100644 --- a/src/main/java/com/noorq/casser/core/operation/AbstractStatementOperation.java +++ b/src/main/java/com/noorq/casser/core/operation/AbstractStatementOperation.java @@ -18,6 +18,8 @@ package com.noorq.casser.core.operation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.RegularStatement; @@ -30,6 +32,7 @@ import com.datastax.driver.core.querybuilder.BuiltStatement; import com.google.common.util.concurrent.ListenableFuture; import com.noorq.casser.core.AbstractSessionOperations; import com.noorq.casser.support.CasserException; +import com.noorq.casser.support.Scala; public abstract class AbstractStatementOperation> { @@ -230,4 +233,9 @@ public abstract class AbstractStatementOperation prepareStatementFuture() { + return Scala.asFuture(prepareStatementAsync()); + } + } diff --git a/src/main/java/com/noorq/casser/core/operation/AbstractStreamOperation.java b/src/main/java/com/noorq/casser/core/operation/AbstractStreamOperation.java index f5a6e71..f7fc9f9 100644 --- a/src/main/java/com/noorq/casser/core/operation/AbstractStreamOperation.java +++ b/src/main/java/com/noorq/casser/core/operation/AbstractStreamOperation.java @@ -17,6 +17,8 @@ package com.noorq.casser.core.operation; import java.util.stream.Stream; +import scala.concurrent.Future; + import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; @@ -24,6 +26,7 @@ import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.noorq.casser.core.AbstractSessionOperations; +import com.noorq.casser.support.Scala; public abstract class AbstractStreamOperation> extends AbstractStatementOperation { @@ -52,6 +55,10 @@ public abstract class AbstractStreamOperation> prepareFuture() { + return Scala.asFuture(prepareAsync()); + } + public Stream sync() { ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); @@ -75,5 +82,8 @@ public abstract class AbstractStreamOperation> future() { + return Scala.asFuture(async()); + } } diff --git a/src/main/java/com/noorq/casser/support/Scala.java b/src/main/java/com/noorq/casser/support/Scala.java new file mode 100644 index 0000000..50311f0 --- /dev/null +++ b/src/main/java/com/noorq/casser/support/Scala.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2015 Noorq, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.noorq.casser.support; + +import scala.concurrent.Future; +import scala.concurrent.impl.Promise.DefaultPromise; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +public final class Scala { + + public static Future asFuture(ListenableFuture future) { + final scala.concurrent.Promise promise = new DefaultPromise(); + Futures.addCallback(future, new FutureCallback() { + @Override public void onSuccess(T result) { + promise.success(result); + } + @Override public void onFailure(Throwable t) { + promise.failure(t); + } + }); + return promise.future(); + } + + public static Future> asFuture(ListenableFuture future, A a) { + final scala.concurrent.Promise> promise = new DefaultPromise>(); + Futures.addCallback(future, new FutureCallback() { + @Override public void onSuccess(T result) { + promise.success(new Fun.Tuple2(result, a)); + } + @Override public void onFailure(Throwable t) { + promise.failure(t); + } + }); + return promise.future(); + } + + public static Future> asFuture(ListenableFuture future, A a, B b) { + final scala.concurrent.Promise> promise = new DefaultPromise>(); + Futures.addCallback(future, new FutureCallback() { + @Override public void onSuccess(T result) { + promise.success(new Fun.Tuple3(result, a, b)); + } + @Override public void onFailure(Throwable t) { + promise.failure(t); + } + }); + return promise.future(); + } + + public static Future> asFuture(ListenableFuture future, A a, B b, C c) { + final scala.concurrent.Promise> promise = new DefaultPromise>(); + Futures.addCallback(future, new FutureCallback() { + @Override public void onSuccess(T result) { + promise.success(new Fun.Tuple4(result, a, b, c)); + } + @Override public void onFailure(Throwable t) { + promise.failure(t); + } + }); + return promise.future(); + } + + public static Future> asFuture(ListenableFuture future, A a, B b, C c, D d) { + final scala.concurrent.Promise> promise = new DefaultPromise>(); + Futures.addCallback(future, new FutureCallback() { + @Override public void onSuccess(T result) { + promise.success(new Fun.Tuple5(result, a, b, c, d)); + } + @Override public void onFailure(Throwable t) { + promise.failure(t); + } + }); + return promise.future(); + } + +}