Remove Scala support and trim Future support at some point I'll re-introduce using Java 8 classes rather than Guava's or Scala's
This commit is contained in:
parent
c42803b964
commit
f4dbf34920
11 changed files with 35 additions and 391 deletions
27
README.md
27
README.md
|
@ -1,5 +1,5 @@
|
|||
# Helenus
|
||||
Fast and easy, functional style cutting edge Java 8 and Scala 2.11 Cassandra client for C* 3.x
|
||||
Fast and easy, functional style cutting edge Java 8 Cassandra client for C* 3.x
|
||||
|
||||
|
||||
### Features
|
||||
|
@ -9,14 +9,13 @@ Fast and easy, functional style cutting edge Java 8 and Scala 2.11 Cassandra cli
|
|||
* Reactive asynchronous and synchronous API
|
||||
* Provides Java mapping for Tables, Tuples, UDTs (User Defined Type), Collections, UDT Collections, Tuple Collections
|
||||
* Uses lazy mapping in all cases where possible
|
||||
* Supports Guava ListenableFuture and Scala Future
|
||||
* Supports Java 8 Futures and Guava ListenableFuture
|
||||
|
||||
### Requirements
|
||||
|
||||
* JVM 8
|
||||
* Datastax Driver 3.x
|
||||
* Cassandra 3.x
|
||||
* Scala 2.11+
|
||||
* Maven
|
||||
|
||||
### Maven
|
||||
|
@ -32,27 +31,6 @@ Latest release dependency:
|
|||
</dependencies>
|
||||
```
|
||||
|
||||
Active development dependency for Scala 2.11:
|
||||
```
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>net.helenus</groupId>
|
||||
<artifactId>helenus-core</artifactId>
|
||||
<version>1.2.0_2.11-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>oss-sonatype</id>
|
||||
<name>oss-sonatype</name>
|
||||
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
</repositories>
|
||||
```
|
||||
|
||||
### Simple Example
|
||||
|
||||
|
@ -132,7 +110,6 @@ public interface AbstractRepository {
|
|||
|
||||
Account repository:
|
||||
```
|
||||
import scala.concurrent.Future;
|
||||
|
||||
public interface AccountRepository extends AbstractRepository {
|
||||
|
||||
|
|
|
@ -11,7 +11,6 @@
|
|||
</content>
|
||||
<orderEntry type="inheritedJdk" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
<orderEntry type="library" name="Maven: org.scala-lang:scala-library:2.13.0-M1" level="project" />
|
||||
<orderEntry type="library" name="Maven: com.datastax.cassandra:cassandra-driver-core:3.3.0" level="project" />
|
||||
<orderEntry type="library" name="Maven: io.netty:netty-handler:4.0.47.Final" level="project" />
|
||||
<orderEntry type="library" name="Maven: io.netty:netty-buffer:4.0.47.Final" level="project" />
|
||||
|
@ -30,12 +29,9 @@
|
|||
<orderEntry type="library" name="Maven: com.github.jnr:jnr-x86asm:1.0.2" level="project" />
|
||||
<orderEntry type="library" name="Maven: com.github.jnr:jnr-posix:3.0.27" level="project" />
|
||||
<orderEntry type="library" name="Maven: com.github.jnr:jnr-constants:0.9.0" level="project" />
|
||||
<orderEntry type="library" name="Maven: com.diffplug.durian:durian:3.4.0" level="project" />
|
||||
<orderEntry type="library" name="Maven: org.aspectj:aspectjrt:1.8.10" level="project" />
|
||||
<orderEntry type="library" name="Maven: org.aspectj:aspectjweaver:1.8.10" level="project" />
|
||||
<orderEntry type="library" name="Maven: net.javacrumbs.future-converter:future-converter-java8-guava:1.1.0" level="project" />
|
||||
<orderEntry type="library" name="Maven: net.javacrumbs.future-converter:future-converter-common:1.1.0" level="project" />
|
||||
<orderEntry type="library" name="Maven: net.javacrumbs.future-converter:future-converter-java8-common:1.1.0" level="project" />
|
||||
<orderEntry type="library" name="Maven: net.javacrumbs.future-converter:future-converter-guava-common:1.1.0" level="project" />
|
||||
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.6" level="project" />
|
||||
<orderEntry type="library" name="Maven: org.springframework:spring-core:4.3.10.RELEASE" level="project" />
|
||||
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
|
||||
|
|
26
pom.xml
26
pom.xml
|
@ -40,6 +40,9 @@
|
|||
<properties>
|
||||
<dist.id>helenus</dist.id>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
<maven.compiler.target>1.8</maven.compiler.target>
|
||||
<maven.compiler.source>1.8</maven.compiler.source>
|
||||
</properties>
|
||||
|
||||
<repositories>
|
||||
|
@ -50,11 +53,6 @@
|
|||
</repository>
|
||||
</repositories>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.target>1.8</maven.compiler.target>
|
||||
<maven.compiler.source>1.8</maven.compiler.source>
|
||||
</properties>
|
||||
|
||||
<profiles>
|
||||
|
||||
<profile>
|
||||
|
@ -108,18 +106,18 @@
|
|||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-library</artifactId>
|
||||
<version>2.13.0-M1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.datastax.cassandra</groupId>
|
||||
<artifactId>cassandra-driver-core</artifactId>
|
||||
<version>3.3.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.diffplug.durian</groupId>
|
||||
<artifactId>durian</artifactId>
|
||||
<version>3.4.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.aspectj</groupId>
|
||||
<artifactId>aspectjrt</artifactId>
|
||||
|
@ -132,12 +130,6 @@
|
|||
<version>1.8.10</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>net.javacrumbs.future-converter</groupId>
|
||||
<artifactId>future-converter-java8-guava</artifactId>
|
||||
<version>1.1.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
package net.helenus.core;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import com.datastax.driver.core.schemabuilder.SchemaStatement;
|
||||
|
@ -54,80 +55,57 @@ public abstract class AbstractSessionOperations {
|
|||
|
||||
abstract public ConsistencyLevel getDefaultConsistencyLevel();
|
||||
|
||||
|
||||
public PreparedStatement prepare(RegularStatement statement) {
|
||||
|
||||
try {
|
||||
|
||||
log(statement, false);
|
||||
|
||||
return currentSession().prepare(statement);
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
throw translateException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public ListenableFuture<PreparedStatement> prepareAsync(RegularStatement statement) {
|
||||
|
||||
public ListenableFuture<PreparedStatement> prepareAsync(RegularStatement statement) {
|
||||
try {
|
||||
|
||||
log(statement, false);
|
||||
|
||||
return currentSession().prepareAsync(statement);
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
throw translateException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public ResultSet execute(Statement statement, boolean showValues) {
|
||||
|
||||
return executeAsync(statement, showValues).getUninterruptibly();
|
||||
|
||||
}
|
||||
|
||||
public ResultSetFuture executeAsync(Statement statement, boolean showValues) {
|
||||
|
||||
try {
|
||||
|
||||
log(statement, showValues);
|
||||
|
||||
return currentSession().executeAsync(statement);
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
throw translateException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void log(Statement statement, boolean showValues) {
|
||||
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Execute statement " + statement);
|
||||
}
|
||||
|
||||
if (isShowCql()) {
|
||||
|
||||
if (statement instanceof BuiltStatement) {
|
||||
|
||||
BuiltStatement builtStatement = (BuiltStatement) statement;
|
||||
|
||||
if (showValues) {
|
||||
RegularStatement regularStatement = builtStatement.setForceNoValues(true);
|
||||
printCql(regularStatement.getQueryString());
|
||||
} else {
|
||||
printCql(builtStatement.getQueryString());
|
||||
}
|
||||
|
||||
} else if (statement instanceof RegularStatement) {
|
||||
RegularStatement regularStatement = (RegularStatement) statement;
|
||||
printCql(regularStatement.getQueryString());
|
||||
} else {
|
||||
printCql(statement.toString());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -135,11 +113,9 @@ public abstract class AbstractSessionOperations {
|
|||
}
|
||||
|
||||
RuntimeException translateException(RuntimeException e) {
|
||||
|
||||
if (e instanceof HelenusException) {
|
||||
return e;
|
||||
}
|
||||
|
||||
throw new HelenusException(e);
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.function.Function;
|
|||
import com.datastax.driver.core.*;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.diffplug.common.base.Errors;
|
||||
|
||||
import net.helenus.core.operation.*;
|
||||
import net.helenus.core.reflect.HelenusPropertyNode;
|
||||
|
@ -152,12 +153,10 @@ public final class HelenusSession extends AbstractSessionOperations implements C
|
|||
}
|
||||
|
||||
public synchronized Function<Void, Void> commit() throws ConflictingUnitOfWorkException {
|
||||
final Function<Void, Void> f = Function.<Void>identity();
|
||||
synchronized(currentUnitOfWork) {
|
||||
if (currentUnitOfWork != null) {
|
||||
currentUnitOfWork.commit().andThen((it) -> { return f; });
|
||||
currentUnitOfWork = null;
|
||||
}
|
||||
Function<Void, Void> f = Function.<Void>identity();
|
||||
if (currentUnitOfWork != null) {
|
||||
f = Errors.rethrow().<Function<Void, Void>>wrap(currentUnitOfWork::commit).get();
|
||||
currentUnitOfWork = null;
|
||||
}
|
||||
return f;
|
||||
}
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package net.helenus.core;
|
||||
|
||||
import com.diffplug.common.base.Errors;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.function.Function;
|
||||
|
||||
|
@ -40,11 +42,11 @@ public class UnitOfWork {
|
|||
* when the work overlaps with other concurrent writers.
|
||||
*/
|
||||
public Function<Void, Void> commit() throws ConflictingUnitOfWorkException {
|
||||
// nested.foreach.commit()
|
||||
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
|
||||
// log.record(txn::provisionalCommit)
|
||||
// examine log for conflicts in read-set and write-set between begin and
|
||||
// provisional commit
|
||||
// examine log for conflicts in read-set and write-set between begin and provisional commit
|
||||
// if (conflict) { throw new ConflictingUnitOfWorkException(this) }
|
||||
// else return function so as to enable commit.andThen(() -> { do something iff commit was successful; })
|
||||
return Function.<Void>identity();
|
||||
}
|
||||
|
||||
|
|
|
@ -15,19 +15,10 @@
|
|||
*/
|
||||
package net.helenus.core.operation;
|
||||
|
||||
import com.datastax.driver.core.PreparedStatement;
|
||||
import com.datastax.driver.core.ResultSet;
|
||||
import com.datastax.driver.core.ResultSetFuture;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
import net.helenus.core.AbstractSessionOperations;
|
||||
import net.helenus.support.Fun;
|
||||
import net.helenus.support.Scala;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import static net.javacrumbs.futureconverter.java8guava.FutureConverter.*;
|
||||
|
||||
|
||||
public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> extends AbstractStatementOperation<E, O> {
|
||||
|
@ -50,27 +41,7 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
|
|||
return new PreparedOperation<E>(prepareStatement(), this);
|
||||
}
|
||||
|
||||
public ListenableFuture<PreparedOperation<E>> prepareAsync() {
|
||||
|
||||
final O _this = (O) this;
|
||||
|
||||
return Futures.transform(prepareStatementAsync(), new Function<PreparedStatement, PreparedOperation<E>>() {
|
||||
|
||||
@Override
|
||||
public PreparedOperation<E> apply(PreparedStatement preparedStatement) {
|
||||
return new PreparedOperation<E>(preparedStatement, _this);
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
public Future<PreparedOperation<E>> prepareFuture() {
|
||||
return Scala.asFuture(prepareAsync());
|
||||
}
|
||||
|
||||
public E sync() {
|
||||
|
||||
ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly();
|
||||
E result = transform(resultSet);
|
||||
if (cacheable()) {
|
||||
|
@ -79,48 +50,6 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>> ex
|
|||
return result;
|
||||
}
|
||||
|
||||
public ListenableFuture<E> async() {
|
||||
|
||||
ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues);
|
||||
|
||||
ListenableFuture<E> future = Futures.transform(resultSetFuture, new Function<ResultSet, E>() {
|
||||
|
||||
@Override
|
||||
public E apply(ResultSet resultSet) {
|
||||
E result = transform(resultSet);
|
||||
if (cacheable()) {
|
||||
sessionOps.cache(getCacheKey(), result);
|
||||
}
|
||||
return transform(resultSet);
|
||||
}
|
||||
|
||||
}, sessionOps.getExecutor());
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
public CompletableFuture<E> completable() {
|
||||
return toCompletableFuture(async());
|
||||
}
|
||||
|
||||
public Future<E> future() {
|
||||
return Scala.asFuture(async());
|
||||
}
|
||||
|
||||
public <A> Future<Fun.Tuple2<E, A>> future(A a) {
|
||||
return Scala.asFuture(async(), a);
|
||||
}
|
||||
|
||||
public <A, B> Future<Fun.Tuple3<E, A, B>> future(A a, B b) {
|
||||
return Scala.asFuture(async(), a, b);
|
||||
}
|
||||
|
||||
public <A, B, C> Future<Fun.Tuple4<E, A, B, C>> future(A a, B b, C c) {
|
||||
return Scala.asFuture(async(), a, b, c);
|
||||
}
|
||||
|
||||
public <A, B, C, D> Future<Fun.Tuple5<E, A, B, C, D>> future(A a, B b, C c, D d) {
|
||||
return Scala.asFuture(async(), a, b, c, d);
|
||||
}
|
||||
public CompletableFuture<E> async() { return CompletableFuture.supplyAsync(this::sync); }
|
||||
|
||||
}
|
||||
|
|
|
@ -25,15 +25,9 @@ import com.google.common.util.concurrent.Futures;
|
|||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
import net.helenus.core.AbstractSessionOperations;
|
||||
import net.helenus.support.Fun;
|
||||
import net.helenus.support.Scala;
|
||||
import scala.Option;
|
||||
import scala.Some;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
|
||||
extends
|
||||
AbstractStatementOperation<E, O> {
|
||||
extends AbstractStatementOperation<E, O> {
|
||||
|
||||
public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) {
|
||||
super(sessionOperations);
|
||||
|
@ -46,87 +40,32 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
}
|
||||
|
||||
public ListenableFuture<PreparedOptionalOperation<E>> prepareAsync() {
|
||||
|
||||
final O _this = (O) this;
|
||||
|
||||
return Futures.transform(prepareStatementAsync(),
|
||||
new Function<PreparedStatement, PreparedOptionalOperation<E>>() {
|
||||
|
||||
@Override
|
||||
public PreparedOptionalOperation<E> apply(PreparedStatement preparedStatement) {
|
||||
return new PreparedOptionalOperation<E>(preparedStatement, _this);
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
public Future<PreparedOptionalOperation<E>> prepareFuture() {
|
||||
return Scala.asFuture(prepareAsync());
|
||||
}
|
||||
|
||||
public Optional<E> sync() {
|
||||
|
||||
ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly();
|
||||
|
||||
return transform(resultSet);
|
||||
}
|
||||
|
||||
public ListenableFuture<Optional<E>> async() {
|
||||
|
||||
ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues);
|
||||
|
||||
ListenableFuture<Optional<E>> future = Futures.transform(resultSetFuture,
|
||||
new Function<ResultSet, Optional<E>>() {
|
||||
|
||||
@Override
|
||||
public Optional<E> apply(ResultSet resultSet) {
|
||||
return transform(resultSet);
|
||||
}
|
||||
|
||||
}, sessionOps.getExecutor());
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
public ListenableFuture<Option<E>> asyncForScala() {
|
||||
|
||||
ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues);
|
||||
|
||||
ListenableFuture<Option<E>> future = Futures.transform(resultSetFuture, new Function<ResultSet, Option<E>>() {
|
||||
|
||||
@Override
|
||||
public Option<E> apply(ResultSet resultSet) {
|
||||
Optional<E> optional = transform(resultSet);
|
||||
if (optional.isPresent()) {
|
||||
return new Some<E>(optional.get());
|
||||
} else {
|
||||
return Option.empty();
|
||||
}
|
||||
}
|
||||
|
||||
}, sessionOps.getExecutor());
|
||||
|
||||
return future;
|
||||
}
|
||||
public Future<Option<E>> future() {
|
||||
return Scala.asFuture(asyncForScala());
|
||||
}
|
||||
|
||||
public <A> Future<Fun.Tuple2<Option<E>, A>> future(A a) {
|
||||
return Scala.asFuture(asyncForScala(), a);
|
||||
}
|
||||
|
||||
public <A, B> Future<Fun.Tuple3<Option<E>, A, B>> future(A a, B b) {
|
||||
return Scala.asFuture(asyncForScala(), a, b);
|
||||
}
|
||||
|
||||
public <A, B, C> Future<Fun.Tuple4<Option<E>, A, B, C>> future(A a, B b, C c) {
|
||||
return Scala.asFuture(asyncForScala(), a, b, c);
|
||||
}
|
||||
|
||||
public <A, B, C, D> Future<Fun.Tuple5<Option<E>, A, B, C, D>> future(A a, B b, C c, D d) {
|
||||
return Scala.asFuture(asyncForScala(), a, b, c, d);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -31,8 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture;
|
|||
|
||||
import net.helenus.core.AbstractSessionOperations;
|
||||
import net.helenus.support.HelenusException;
|
||||
import net.helenus.support.Scala;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
|
||||
public abstract class AbstractStatementOperation<E, O extends AbstractStatementOperation<E, O>> {
|
||||
|
||||
|
@ -257,8 +256,4 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
|
|||
throw new HelenusException("only RegularStatements can be prepared");
|
||||
}
|
||||
|
||||
public Future<PreparedStatement> prepareStatementFuture() {
|
||||
return Scala.asFuture(prepareStatementAsync());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,13 +25,9 @@ import com.google.common.util.concurrent.Futures;
|
|||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
import net.helenus.core.AbstractSessionOperations;
|
||||
import net.helenus.support.Fun;
|
||||
import net.helenus.support.Scala;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
|
||||
extends
|
||||
AbstractStatementOperation<E, O> {
|
||||
extends AbstractStatementOperation<E, O> {
|
||||
|
||||
public AbstractStreamOperation(AbstractSessionOperations sessionOperations) {
|
||||
super(sessionOperations);
|
||||
|
@ -44,85 +40,31 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
}
|
||||
|
||||
public ListenableFuture<PreparedStreamOperation<E>> prepareAsync() {
|
||||
|
||||
final O _this = (O) this;
|
||||
|
||||
return Futures.transform(prepareStatementAsync(),
|
||||
new Function<PreparedStatement, PreparedStreamOperation<E>>() {
|
||||
|
||||
@Override
|
||||
public PreparedStreamOperation<E> apply(PreparedStatement preparedStatement) {
|
||||
return new PreparedStreamOperation<E>(preparedStatement, _this);
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
public Future<PreparedStreamOperation<E>> prepareFuture() {
|
||||
return Scala.asFuture(prepareAsync());
|
||||
}
|
||||
|
||||
public Stream<E> sync() {
|
||||
|
||||
ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly();
|
||||
|
||||
return transform(resultSet);
|
||||
}
|
||||
|
||||
public ListenableFuture<Stream<E>> async() {
|
||||
|
||||
ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues);
|
||||
|
||||
ListenableFuture<Stream<E>> future = Futures.transform(resultSetFuture, new Function<ResultSet, Stream<E>>() {
|
||||
|
||||
@Override
|
||||
public Stream<E> apply(ResultSet resultSet) {
|
||||
ListenableFuture<Stream<E>> future = Futures.transform(resultSetFuture,
|
||||
new Function<ResultSet, Stream<E>>() {
|
||||
@Override
|
||||
public Stream<E> apply(ResultSet resultSet) {
|
||||
return transform(resultSet);
|
||||
}
|
||||
|
||||
}, sessionOps.getExecutor());
|
||||
|
||||
}, sessionOps.getExecutor());
|
||||
return future;
|
||||
}
|
||||
|
||||
public ListenableFuture<scala.collection.immutable.Stream<E>> asyncForScala() {
|
||||
|
||||
ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues);
|
||||
|
||||
ListenableFuture<scala.collection.immutable.Stream<E>> future = Futures.transform(resultSetFuture,
|
||||
new Function<ResultSet, scala.collection.immutable.Stream<E>>() {
|
||||
|
||||
@Override
|
||||
public scala.collection.immutable.Stream<E> apply(ResultSet resultSet) {
|
||||
Stream<E> stream = transform(resultSet);
|
||||
return scala.collection.JavaConversions.asScalaIterator(stream.iterator()).toStream();
|
||||
}
|
||||
|
||||
}, sessionOps.getExecutor());
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
public Future<scala.collection.immutable.Stream<E>> future() {
|
||||
return Scala.asFuture(asyncForScala());
|
||||
}
|
||||
|
||||
public <A> Future<Fun.Tuple2<scala.collection.immutable.Stream<E>, A>> future(A a) {
|
||||
return Scala.asFuture(asyncForScala(), a);
|
||||
}
|
||||
|
||||
public <A, B> Future<Fun.Tuple3<scala.collection.immutable.Stream<E>, A, B>> future(A a, B b) {
|
||||
return Scala.asFuture(asyncForScala(), a, b);
|
||||
}
|
||||
|
||||
public <A, B, C> Future<Fun.Tuple4<scala.collection.immutable.Stream<E>, A, B, C>> future(A a, B b, C c) {
|
||||
return Scala.asFuture(asyncForScala(), a, b, c);
|
||||
}
|
||||
|
||||
public <A, B, C, D> Future<Fun.Tuple5<scala.collection.immutable.Stream<E>, A, B, C, D>> future(A a, B b, C c,
|
||||
D d) {
|
||||
return Scala.asFuture(asyncForScala(), a, b, c, d);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,103 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2015 The Helenus Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package net.helenus.support;
|
||||
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
|
||||
import scala.concurrent.Future;
|
||||
import scala.concurrent.impl.Promise.DefaultPromise;
|
||||
|
||||
public final class Scala {
|
||||
|
||||
public static <T> Future<T> asFuture(ListenableFuture<T> future) {
|
||||
final scala.concurrent.Promise<T> promise = new DefaultPromise<T>();
|
||||
Futures.addCallback(future, new FutureCallback<T>() {
|
||||
@Override
|
||||
public void onSuccess(T result) {
|
||||
promise.success(result);
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
promise.failure(t);
|
||||
}
|
||||
});
|
||||
return promise.future();
|
||||
}
|
||||
|
||||
public static <T, A> Future<Fun.Tuple2<T, A>> asFuture(ListenableFuture<T> future, A a) {
|
||||
final scala.concurrent.Promise<Fun.Tuple2<T, A>> promise = new DefaultPromise<Fun.Tuple2<T, A>>();
|
||||
Futures.addCallback(future, new FutureCallback<T>() {
|
||||
@Override
|
||||
public void onSuccess(T result) {
|
||||
promise.success(new Fun.Tuple2<T, A>(result, a));
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
promise.failure(t);
|
||||
}
|
||||
});
|
||||
return promise.future();
|
||||
}
|
||||
|
||||
public static <T, A, B> Future<Fun.Tuple3<T, A, B>> asFuture(ListenableFuture<T> future, A a, B b) {
|
||||
final scala.concurrent.Promise<Fun.Tuple3<T, A, B>> promise = new DefaultPromise<Fun.Tuple3<T, A, B>>();
|
||||
Futures.addCallback(future, new FutureCallback<T>() {
|
||||
@Override
|
||||
public void onSuccess(T result) {
|
||||
promise.success(new Fun.Tuple3<T, A, B>(result, a, b));
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
promise.failure(t);
|
||||
}
|
||||
});
|
||||
return promise.future();
|
||||
}
|
||||
|
||||
public static <T, A, B, C> Future<Fun.Tuple4<T, A, B, C>> asFuture(ListenableFuture<T> future, A a, B b, C c) {
|
||||
final scala.concurrent.Promise<Fun.Tuple4<T, A, B, C>> promise = new DefaultPromise<Fun.Tuple4<T, A, B, C>>();
|
||||
Futures.addCallback(future, new FutureCallback<T>() {
|
||||
@Override
|
||||
public void onSuccess(T result) {
|
||||
promise.success(new Fun.Tuple4<T, A, B, C>(result, a, b, c));
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
promise.failure(t);
|
||||
}
|
||||
});
|
||||
return promise.future();
|
||||
}
|
||||
|
||||
public static <T, A, B, C, D> Future<Fun.Tuple5<T, A, B, C, D>> asFuture(ListenableFuture<T> future, A a, B b, C c,
|
||||
D d) {
|
||||
final scala.concurrent.Promise<Fun.Tuple5<T, A, B, C, D>> promise = new DefaultPromise<Fun.Tuple5<T, A, B, C, D>>();
|
||||
Futures.addCallback(future, new FutureCallback<T>() {
|
||||
@Override
|
||||
public void onSuccess(T result) {
|
||||
promise.success(new Fun.Tuple5<T, A, B, C, D>(result, a, b, c, d));
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
promise.failure(t);
|
||||
}
|
||||
});
|
||||
return promise.future();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in a new issue