add scala 2.11 support

This commit is contained in:
Albert Shift 2015-06-03 12:22:22 -07:00
parent fc0eaef8ee
commit 869fd0aae3
5 changed files with 126 additions and 1 deletions

View file

@ -88,11 +88,16 @@
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.6</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>

View file

@ -15,6 +15,8 @@
*/
package com.noorq.casser.core.operation;
import scala.concurrent.Future;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
@ -22,6 +24,7 @@ import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.noorq.casser.core.AbstractSessionOperations;
import com.noorq.casser.support.Scala;
public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> extends AbstractStatementOperation<E, O> {
@ -50,6 +53,10 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
}
public Future<PreparedOperation<E>> prepareFuture() {
return Scala.asFuture(prepareAsync());
}
public E sync() {
ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly();
@ -73,5 +80,8 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
return future;
}
public Future<E> future() {
return Scala.asFuture(async());
}
}

View file

@ -18,6 +18,8 @@ package com.noorq.casser.core.operation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement;
@ -30,6 +32,7 @@ import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.google.common.util.concurrent.ListenableFuture;
import com.noorq.casser.core.AbstractSessionOperations;
import com.noorq.casser.support.CasserException;
import com.noorq.casser.support.Scala;
public abstract class AbstractStatementOperation<E, O extends AbstractStatementOperation<E, O>> {
@ -230,4 +233,9 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
throw new CasserException("only RegularStatements can be prepared");
}
public Future<PreparedStatement> prepareStatementFuture() {
return Scala.asFuture(prepareStatementAsync());
}
}

View file

@ -17,6 +17,8 @@ package com.noorq.casser.core.operation;
import java.util.stream.Stream;
import scala.concurrent.Future;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
@ -24,6 +26,7 @@ import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.noorq.casser.core.AbstractSessionOperations;
import com.noorq.casser.support.Scala;
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>> extends AbstractStatementOperation<E, O> {
@ -52,6 +55,10 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
}
public Future<PreparedStreamOperation<E>> prepareFuture() {
return Scala.asFuture(prepareAsync());
}
public Stream<E> sync() {
ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly();
@ -75,5 +82,8 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
return future;
}
public Future<Stream<E>> future() {
return Scala.asFuture(async());
}
}

View file

@ -0,0 +1,92 @@
/*
* Copyright (C) 2015 Noorq, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.noorq.casser.support;
import scala.concurrent.Future;
import scala.concurrent.impl.Promise.DefaultPromise;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
public final class Scala {
public static <T> Future<T> asFuture(ListenableFuture<T> future) {
final scala.concurrent.Promise<T> promise = new DefaultPromise<T>();
Futures.addCallback(future, new FutureCallback<T>() {
@Override public void onSuccess(T result) {
promise.success(result);
}
@Override public void onFailure(Throwable t) {
promise.failure(t);
}
});
return promise.future();
}
public static <T, A> Future<Fun.Tuple2<T, A>> asFuture(ListenableFuture<T> future, A a) {
final scala.concurrent.Promise<Fun.Tuple2<T, A>> promise = new DefaultPromise<Fun.Tuple2<T, A>>();
Futures.addCallback(future, new FutureCallback<T>() {
@Override public void onSuccess(T result) {
promise.success(new Fun.Tuple2<T, A>(result, a));
}
@Override public void onFailure(Throwable t) {
promise.failure(t);
}
});
return promise.future();
}
public static <T, A, B> Future<Fun.Tuple3<T, A, B>> asFuture(ListenableFuture<T> future, A a, B b) {
final scala.concurrent.Promise<Fun.Tuple3<T, A, B>> promise = new DefaultPromise<Fun.Tuple3<T, A, B>>();
Futures.addCallback(future, new FutureCallback<T>() {
@Override public void onSuccess(T result) {
promise.success(new Fun.Tuple3<T, A, B>(result, a, b));
}
@Override public void onFailure(Throwable t) {
promise.failure(t);
}
});
return promise.future();
}
public static <T, A, B, C> Future<Fun.Tuple4<T, A, B, C>> asFuture(ListenableFuture<T> future, A a, B b, C c) {
final scala.concurrent.Promise<Fun.Tuple4<T, A, B, C>> promise = new DefaultPromise<Fun.Tuple4<T, A, B, C>>();
Futures.addCallback(future, new FutureCallback<T>() {
@Override public void onSuccess(T result) {
promise.success(new Fun.Tuple4<T, A, B, C>(result, a, b, c));
}
@Override public void onFailure(Throwable t) {
promise.failure(t);
}
});
return promise.future();
}
public static <T, A, B, C, D> Future<Fun.Tuple5<T, A, B, C, D>> asFuture(ListenableFuture<T> future, A a, B b, C c, D d) {
final scala.concurrent.Promise<Fun.Tuple5<T, A, B, C, D>> promise = new DefaultPromise<Fun.Tuple5<T, A, B, C, D>>();
Futures.addCallback(future, new FutureCallback<T>() {
@Override public void onSuccess(T result) {
promise.success(new Fun.Tuple5<T, A, B, C, D>(result, a, b, c, d));
}
@Override public void onFailure(Throwable t) {
promise.failure(t);
}
});
return promise.future();
}
}