diff --git a/README.md b/README.md
index 25f9eda..80b470a 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
# Helenus
-Fast and easy, functional style cutting edge Java 8 and Scala 2.11 Cassandra client for C* 3.x
+Fast and easy, functional style cutting edge Java 8 Cassandra client for C* 3.x
### Features
@@ -9,14 +9,13 @@ Fast and easy, functional style cutting edge Java 8 and Scala 2.11 Cassandra cli
* Reactive asynchronous and synchronous API
* Provides Java mapping for Tables, Tuples, UDTs (User Defined Type), Collections, UDT Collections, Tuple Collections
* Uses lazy mapping in all cases where possible
-* Supports Guava ListenableFuture and Scala Future
+* Supports Java 8 Futures and Guava ListenableFuture
### Requirements
* JVM 8
* Datastax Driver 3.x
* Cassandra 3.x
-* Scala 2.11+
* Maven
### Maven
@@ -32,27 +31,6 @@ Latest release dependency:
```
-Active development dependency for Scala 2.11:
-```
-
-
- net.helenus
- helenus-core
- 1.2.0_2.11-SNAPSHOT
-
-
-
-
-
- oss-sonatype
- oss-sonatype
- https://oss.sonatype.org/content/repositories/snapshots/
-
- true
-
-
-
-```
### Simple Example
@@ -132,7 +110,6 @@ public interface AbstractRepository {
Account repository:
```
-import scala.concurrent.Future;
public interface AccountRepository extends AbstractRepository {
diff --git a/helenus-core.iml b/helenus-core.iml
index 386e779..407d821 100644
--- a/helenus-core.iml
+++ b/helenus-core.iml
@@ -11,7 +11,6 @@
-
@@ -30,12 +29,9 @@
+
-
-
-
-
diff --git a/pom.xml b/pom.xml
index 6215c57..b735ed9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,6 +40,9 @@
helenus
UTF-8
+ UTF-8
+ 1.8
+ 1.8
@@ -50,11 +53,6 @@
-
- 1.8
- 1.8
-
-
@@ -108,18 +106,18 @@
-
- org.scala-lang
- scala-library
- 2.13.0-M1
-
-
com.datastax.cassandra
cassandra-driver-core
3.3.0
+
+ com.diffplug.durian
+ durian
+ 3.4.0
+
+
org.aspectj
aspectjrt
@@ -132,12 +130,6 @@
1.8.10
-
- net.javacrumbs.future-converter
- future-converter-java8-guava
- 1.1.0
-
-
org.apache.commons
commons-lang3
diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java
index 6f136b8..9493e39 100644
--- a/src/main/java/net/helenus/core/AbstractSessionOperations.java
+++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java
@@ -16,6 +16,7 @@
package net.helenus.core;
import java.io.PrintStream;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import com.datastax.driver.core.schemabuilder.SchemaStatement;
@@ -54,80 +55,57 @@ public abstract class AbstractSessionOperations {
abstract public ConsistencyLevel getDefaultConsistencyLevel();
+
public PreparedStatement prepare(RegularStatement statement) {
-
try {
-
log(statement, false);
-
return currentSession().prepare(statement);
-
} catch (RuntimeException e) {
throw translateException(e);
}
-
}
- public ListenableFuture prepareAsync(RegularStatement statement) {
-
+ public ListenableFuture prepareAsync(RegularStatement statement) {
try {
-
log(statement, false);
-
return currentSession().prepareAsync(statement);
-
} catch (RuntimeException e) {
throw translateException(e);
}
-
}
public ResultSet execute(Statement statement, boolean showValues) {
-
return executeAsync(statement, showValues).getUninterruptibly();
-
}
public ResultSetFuture executeAsync(Statement statement, boolean showValues) {
-
try {
-
log(statement, showValues);
-
return currentSession().executeAsync(statement);
-
} catch (RuntimeException e) {
throw translateException(e);
}
-
}
void log(Statement statement, boolean showValues) {
-
if (logger.isInfoEnabled()) {
logger.info("Execute statement " + statement);
}
-
if (isShowCql()) {
-
if (statement instanceof BuiltStatement) {
-
BuiltStatement builtStatement = (BuiltStatement) statement;
-
if (showValues) {
RegularStatement regularStatement = builtStatement.setForceNoValues(true);
printCql(regularStatement.getQueryString());
} else {
printCql(builtStatement.getQueryString());
}
-
} else if (statement instanceof RegularStatement) {
RegularStatement regularStatement = (RegularStatement) statement;
printCql(regularStatement.getQueryString());
} else {
printCql(statement.toString());
}
-
}
}
@@ -135,11 +113,9 @@ public abstract class AbstractSessionOperations {
}
RuntimeException translateException(RuntimeException e) {
-
if (e instanceof HelenusException) {
return e;
}
-
throw new HelenusException(e);
}
diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java
index cdc968b..3d2fa39 100644
--- a/src/main/java/net/helenus/core/HelenusSession.java
+++ b/src/main/java/net/helenus/core/HelenusSession.java
@@ -26,6 +26,7 @@ import java.util.function.Function;
import com.datastax.driver.core.*;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.diffplug.common.base.Errors;
import net.helenus.core.operation.*;
import net.helenus.core.reflect.HelenusPropertyNode;
@@ -152,12 +153,10 @@ public final class HelenusSession extends AbstractSessionOperations implements C
}
public synchronized Function commit() throws ConflictingUnitOfWorkException {
- final Function f = Function.identity();
- synchronized(currentUnitOfWork) {
- if (currentUnitOfWork != null) {
- currentUnitOfWork.commit().andThen((it) -> { return f; });
- currentUnitOfWork = null;
- }
+ Function f = Function.identity();
+ if (currentUnitOfWork != null) {
+ f = Errors.rethrow().>wrap(currentUnitOfWork::commit).get();
+ currentUnitOfWork = null;
}
return f;
}
diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java
index 79902f6..61c6dd3 100644
--- a/src/main/java/net/helenus/core/UnitOfWork.java
+++ b/src/main/java/net/helenus/core/UnitOfWork.java
@@ -1,5 +1,7 @@
package net.helenus.core;
+import com.diffplug.common.base.Errors;
+
import java.util.ArrayList;
import java.util.function.Function;
@@ -40,11 +42,11 @@ public class UnitOfWork {
* when the work overlaps with other concurrent writers.
*/
public Function commit() throws ConflictingUnitOfWorkException {
- // nested.foreach.commit()
+ nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
// log.record(txn::provisionalCommit)
- // examine log for conflicts in read-set and write-set between begin and
- // provisional commit
+ // examine log for conflicts in read-set and write-set between begin and provisional commit
// if (conflict) { throw new ConflictingUnitOfWorkException(this) }
+ // else return function so as to enable commit.andThen(() -> { do something iff commit was successful; })
return Function.identity();
}
diff --git a/src/main/java/net/helenus/core/operation/AbstractOperation.java b/src/main/java/net/helenus/core/operation/AbstractOperation.java
index cb90747..b74b77d 100644
--- a/src/main/java/net/helenus/core/operation/AbstractOperation.java
+++ b/src/main/java/net/helenus/core/operation/AbstractOperation.java
@@ -15,19 +15,10 @@
*/
package net.helenus.core.operation;
-import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
import net.helenus.core.AbstractSessionOperations;
-import net.helenus.support.Fun;
-import net.helenus.support.Scala;
-import scala.concurrent.Future;
+
import java.util.concurrent.CompletableFuture;
-import static net.javacrumbs.futureconverter.java8guava.FutureConverter.*;
public abstract class AbstractOperation> extends AbstractStatementOperation {
@@ -50,27 +41,7 @@ public abstract class AbstractOperation> ex
return new PreparedOperation(prepareStatement(), this);
}
- public ListenableFuture> prepareAsync() {
-
- final O _this = (O) this;
-
- return Futures.transform(prepareStatementAsync(), new Function>() {
-
- @Override
- public PreparedOperation apply(PreparedStatement preparedStatement) {
- return new PreparedOperation(preparedStatement, _this);
- }
-
- });
-
- }
-
- public Future> prepareFuture() {
- return Scala.asFuture(prepareAsync());
- }
-
public E sync() {
-
ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly();
E result = transform(resultSet);
if (cacheable()) {
@@ -79,48 +50,6 @@ public abstract class AbstractOperation> ex
return result;
}
- public ListenableFuture async() {
-
- ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues);
-
- ListenableFuture future = Futures.transform(resultSetFuture, new Function() {
-
- @Override
- public E apply(ResultSet resultSet) {
- E result = transform(resultSet);
- if (cacheable()) {
- sessionOps.cache(getCacheKey(), result);
- }
- return transform(resultSet);
- }
-
- }, sessionOps.getExecutor());
-
- return future;
- }
-
- public CompletableFuture completable() {
- return toCompletableFuture(async());
- }
-
- public Future future() {
- return Scala.asFuture(async());
- }
-
- public Future> future(A a) {
- return Scala.asFuture(async(), a);
- }
-
- public Future> future(A a, B b) {
- return Scala.asFuture(async(), a, b);
- }
-
- public Future> future(A a, B b, C c) {
- return Scala.asFuture(async(), a, b, c);
- }
-
- public Future> future(A a, B b, C c, D d) {
- return Scala.asFuture(async(), a, b, c, d);
- }
+ public CompletableFuture async() { return CompletableFuture.supplyAsync(this::sync); }
}
diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java
index 82a4655..f1bcb48 100644
--- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java
+++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java
@@ -25,15 +25,9 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import net.helenus.core.AbstractSessionOperations;
-import net.helenus.support.Fun;
-import net.helenus.support.Scala;
-import scala.Option;
-import scala.Some;
-import scala.concurrent.Future;
public abstract class AbstractOptionalOperation>
- extends
- AbstractStatementOperation {
+ extends AbstractStatementOperation {
public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
@@ -46,87 +40,32 @@ public abstract class AbstractOptionalOperation> prepareAsync() {
-
final O _this = (O) this;
-
return Futures.transform(prepareStatementAsync(),
new Function>() {
-
@Override
public PreparedOptionalOperation apply(PreparedStatement preparedStatement) {
return new PreparedOptionalOperation(preparedStatement, _this);
}
-
});
-
- }
-
- public Future> prepareFuture() {
- return Scala.asFuture(prepareAsync());
}
public Optional sync() {
-
ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly();
-
return transform(resultSet);
}
public ListenableFuture> async() {
-
ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues);
-
ListenableFuture> future = Futures.transform(resultSetFuture,
new Function>() {
-
@Override
public Optional apply(ResultSet resultSet) {
return transform(resultSet);
}
-
}, sessionOps.getExecutor());
return future;
}
- public ListenableFuture