From f4dbf34920c7e594fc538dc628c41aad7a9036e7 Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Fri, 4 Aug 2017 10:18:51 -0400 Subject: [PATCH] Remove Scala support and trim Future support at some point I'll re-introduce using Java 8 classes rather than Guava's or Scala's --- README.md | 27 +---- helenus-core.iml | 6 +- pom.xml | 26 ++--- .../core/AbstractSessionOperations.java | 30 +---- .../java/net/helenus/core/HelenusSession.java | 11 +- .../java/net/helenus/core/UnitOfWork.java | 8 +- .../core/operation/AbstractOperation.java | 75 +------------ .../operation/AbstractOptionalOperation.java | 63 +---------- .../operation/AbstractStatementOperation.java | 7 +- .../operation/AbstractStreamOperation.java | 70 +----------- src/main/java/net/helenus/support/Scala.java | 103 ------------------ 11 files changed, 35 insertions(+), 391 deletions(-) delete mode 100644 src/main/java/net/helenus/support/Scala.java diff --git a/README.md b/README.md index 25f9eda..80b470a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Helenus -Fast and easy, functional style cutting edge Java 8 and Scala 2.11 Cassandra client for C* 3.x +Fast and easy, functional style cutting edge Java 8 Cassandra client for C* 3.x ### Features @@ -9,14 +9,13 @@ Fast and easy, functional style cutting edge Java 8 and Scala 2.11 Cassandra cli * Reactive asynchronous and synchronous API * Provides Java mapping for Tables, Tuples, UDTs (User Defined Type), Collections, UDT Collections, Tuple Collections * Uses lazy mapping in all cases where possible -* Supports Guava ListenableFuture and Scala Future +* Supports Java 8 Futures and Guava ListenableFuture ### Requirements * JVM 8 * Datastax Driver 3.x * Cassandra 3.x -* Scala 2.11+ * Maven ### Maven @@ -32,27 +31,6 @@ Latest release dependency: ``` -Active development dependency for Scala 2.11: -``` - - - net.helenus - helenus-core - 1.2.0_2.11-SNAPSHOT - - - - - - oss-sonatype - oss-sonatype - https://oss.sonatype.org/content/repositories/snapshots/ - - true - - - -``` ### Simple Example @@ -132,7 +110,6 @@ public interface AbstractRepository { Account repository: ``` -import scala.concurrent.Future; public interface AccountRepository extends AbstractRepository { diff --git a/helenus-core.iml b/helenus-core.iml index 386e779..407d821 100644 --- a/helenus-core.iml +++ b/helenus-core.iml @@ -11,7 +11,6 @@ - @@ -30,12 +29,9 @@ + - - - - diff --git a/pom.xml b/pom.xml index 6215c57..b735ed9 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,9 @@ helenus UTF-8 + UTF-8 + 1.8 + 1.8 @@ -50,11 +53,6 @@ - - 1.8 - 1.8 - - @@ -108,18 +106,18 @@ - - org.scala-lang - scala-library - 2.13.0-M1 - - com.datastax.cassandra cassandra-driver-core 3.3.0 + + com.diffplug.durian + durian + 3.4.0 + + org.aspectj aspectjrt @@ -132,12 +130,6 @@ 1.8.10 - - net.javacrumbs.future-converter - future-converter-java8-guava - 1.1.0 - - org.apache.commons commons-lang3 diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index 6f136b8..9493e39 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -16,6 +16,7 @@ package net.helenus.core; import java.io.PrintStream; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import com.datastax.driver.core.schemabuilder.SchemaStatement; @@ -54,80 +55,57 @@ public abstract class AbstractSessionOperations { abstract public ConsistencyLevel getDefaultConsistencyLevel(); + public PreparedStatement prepare(RegularStatement statement) { - try { - log(statement, false); - return currentSession().prepare(statement); - } catch (RuntimeException e) { throw translateException(e); } - } - public ListenableFuture prepareAsync(RegularStatement statement) { - + public ListenableFuture prepareAsync(RegularStatement statement) { try { - log(statement, false); - return currentSession().prepareAsync(statement); - } catch (RuntimeException e) { throw translateException(e); } - } public ResultSet execute(Statement statement, boolean showValues) { - return executeAsync(statement, showValues).getUninterruptibly(); - } public ResultSetFuture executeAsync(Statement statement, boolean showValues) { - try { - log(statement, showValues); - return currentSession().executeAsync(statement); - } catch (RuntimeException e) { throw translateException(e); } - } void log(Statement statement, boolean showValues) { - if (logger.isInfoEnabled()) { logger.info("Execute statement " + statement); } - if (isShowCql()) { - if (statement instanceof BuiltStatement) { - BuiltStatement builtStatement = (BuiltStatement) statement; - if (showValues) { RegularStatement regularStatement = builtStatement.setForceNoValues(true); printCql(regularStatement.getQueryString()); } else { printCql(builtStatement.getQueryString()); } - } else if (statement instanceof RegularStatement) { RegularStatement regularStatement = (RegularStatement) statement; printCql(regularStatement.getQueryString()); } else { printCql(statement.toString()); } - } } @@ -135,11 +113,9 @@ public abstract class AbstractSessionOperations { } RuntimeException translateException(RuntimeException e) { - if (e instanceof HelenusException) { return e; } - throw new HelenusException(e); } diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index cdc968b..3d2fa39 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -26,6 +26,7 @@ import java.util.function.Function; import com.datastax.driver.core.*; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.diffplug.common.base.Errors; import net.helenus.core.operation.*; import net.helenus.core.reflect.HelenusPropertyNode; @@ -152,12 +153,10 @@ public final class HelenusSession extends AbstractSessionOperations implements C } public synchronized Function commit() throws ConflictingUnitOfWorkException { - final Function f = Function.identity(); - synchronized(currentUnitOfWork) { - if (currentUnitOfWork != null) { - currentUnitOfWork.commit().andThen((it) -> { return f; }); - currentUnitOfWork = null; - } + Function f = Function.identity(); + if (currentUnitOfWork != null) { + f = Errors.rethrow().>wrap(currentUnitOfWork::commit).get(); + currentUnitOfWork = null; } return f; } diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 79902f6..61c6dd3 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -1,5 +1,7 @@ package net.helenus.core; +import com.diffplug.common.base.Errors; + import java.util.ArrayList; import java.util.function.Function; @@ -40,11 +42,11 @@ public class UnitOfWork { * when the work overlaps with other concurrent writers. */ public Function commit() throws ConflictingUnitOfWorkException { - // nested.foreach.commit() + nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit)); // log.record(txn::provisionalCommit) - // examine log for conflicts in read-set and write-set between begin and - // provisional commit + // examine log for conflicts in read-set and write-set between begin and provisional commit // if (conflict) { throw new ConflictingUnitOfWorkException(this) } + // else return function so as to enable commit.andThen(() -> { do something iff commit was successful; }) return Function.identity(); } diff --git a/src/main/java/net/helenus/core/operation/AbstractOperation.java b/src/main/java/net/helenus/core/operation/AbstractOperation.java index cb90747..b74b77d 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOperation.java @@ -15,19 +15,10 @@ */ package net.helenus.core.operation; -import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; -import com.google.common.base.Function; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; - import net.helenus.core.AbstractSessionOperations; -import net.helenus.support.Fun; -import net.helenus.support.Scala; -import scala.concurrent.Future; + import java.util.concurrent.CompletableFuture; -import static net.javacrumbs.futureconverter.java8guava.FutureConverter.*; public abstract class AbstractOperation> extends AbstractStatementOperation { @@ -50,27 +41,7 @@ public abstract class AbstractOperation> ex return new PreparedOperation(prepareStatement(), this); } - public ListenableFuture> prepareAsync() { - - final O _this = (O) this; - - return Futures.transform(prepareStatementAsync(), new Function>() { - - @Override - public PreparedOperation apply(PreparedStatement preparedStatement) { - return new PreparedOperation(preparedStatement, _this); - } - - }); - - } - - public Future> prepareFuture() { - return Scala.asFuture(prepareAsync()); - } - public E sync() { - ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); E result = transform(resultSet); if (cacheable()) { @@ -79,48 +50,6 @@ public abstract class AbstractOperation> ex return result; } - public ListenableFuture async() { - - ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues); - - ListenableFuture future = Futures.transform(resultSetFuture, new Function() { - - @Override - public E apply(ResultSet resultSet) { - E result = transform(resultSet); - if (cacheable()) { - sessionOps.cache(getCacheKey(), result); - } - return transform(resultSet); - } - - }, sessionOps.getExecutor()); - - return future; - } - - public CompletableFuture completable() { - return toCompletableFuture(async()); - } - - public Future future() { - return Scala.asFuture(async()); - } - - public Future> future(A a) { - return Scala.asFuture(async(), a); - } - - public Future> future(A a, B b) { - return Scala.asFuture(async(), a, b); - } - - public Future> future(A a, B b, C c) { - return Scala.asFuture(async(), a, b, c); - } - - public Future> future(A a, B b, C c, D d) { - return Scala.asFuture(async(), a, b, c, d); - } + public CompletableFuture async() { return CompletableFuture.supplyAsync(this::sync); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index 82a4655..f1bcb48 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -25,15 +25,9 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import net.helenus.core.AbstractSessionOperations; -import net.helenus.support.Fun; -import net.helenus.support.Scala; -import scala.Option; -import scala.Some; -import scala.concurrent.Future; public abstract class AbstractOptionalOperation> - extends - AbstractStatementOperation { + extends AbstractStatementOperation { public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) { super(sessionOperations); @@ -46,87 +40,32 @@ public abstract class AbstractOptionalOperation> prepareAsync() { - final O _this = (O) this; - return Futures.transform(prepareStatementAsync(), new Function>() { - @Override public PreparedOptionalOperation apply(PreparedStatement preparedStatement) { return new PreparedOptionalOperation(preparedStatement, _this); } - }); - - } - - public Future> prepareFuture() { - return Scala.asFuture(prepareAsync()); } public Optional sync() { - ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); - return transform(resultSet); } public ListenableFuture> async() { - ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues); - ListenableFuture> future = Futures.transform(resultSetFuture, new Function>() { - @Override public Optional apply(ResultSet resultSet) { return transform(resultSet); } - }, sessionOps.getExecutor()); return future; } - public ListenableFuture> asyncForScala() { - - ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues); - - ListenableFuture> future = Futures.transform(resultSetFuture, new Function>() { - - @Override - public Option apply(ResultSet resultSet) { - Optional optional = transform(resultSet); - if (optional.isPresent()) { - return new Some(optional.get()); - } else { - return Option.empty(); - } - } - - }, sessionOps.getExecutor()); - - return future; - } - public Future> future() { - return Scala.asFuture(asyncForScala()); - } - - public Future, A>> future(A a) { - return Scala.asFuture(asyncForScala(), a); - } - - public Future, A, B>> future(A a, B b) { - return Scala.asFuture(asyncForScala(), a, b); - } - - public Future, A, B, C>> future(A a, B b, C c) { - return Scala.asFuture(asyncForScala(), a, b, c); - } - - public Future, A, B, C, D>> future(A a, B b, C c, D d) { - return Scala.asFuture(asyncForScala(), a, b, c, d); - } - } diff --git a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java index 7b75cae..c05d877 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java @@ -31,8 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture; import net.helenus.core.AbstractSessionOperations; import net.helenus.support.HelenusException; -import net.helenus.support.Scala; -import scala.concurrent.Future; + public abstract class AbstractStatementOperation> { @@ -257,8 +256,4 @@ public abstract class AbstractStatementOperation prepareStatementFuture() { - return Scala.asFuture(prepareStatementAsync()); - } - } diff --git a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java index dccb7ff..1098337 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java @@ -25,13 +25,9 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import net.helenus.core.AbstractSessionOperations; -import net.helenus.support.Fun; -import net.helenus.support.Scala; -import scala.concurrent.Future; public abstract class AbstractStreamOperation> - extends - AbstractStatementOperation { + extends AbstractStatementOperation { public AbstractStreamOperation(AbstractSessionOperations sessionOperations) { super(sessionOperations); @@ -44,85 +40,31 @@ public abstract class AbstractStreamOperation> prepareAsync() { - final O _this = (O) this; - return Futures.transform(prepareStatementAsync(), new Function>() { - @Override public PreparedStreamOperation apply(PreparedStatement preparedStatement) { return new PreparedStreamOperation(preparedStatement, _this); } - }); - - } - - public Future> prepareFuture() { - return Scala.asFuture(prepareAsync()); } public Stream sync() { - ResultSet resultSet = sessionOps.executeAsync(options(buildStatement()), showValues).getUninterruptibly(); - return transform(resultSet); } public ListenableFuture> async() { - ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues); - - ListenableFuture> future = Futures.transform(resultSetFuture, new Function>() { - - @Override - public Stream apply(ResultSet resultSet) { + ListenableFuture> future = Futures.transform(resultSetFuture, + new Function>() { + @Override + public Stream apply(ResultSet resultSet) { return transform(resultSet); } - - }, sessionOps.getExecutor()); - + }, sessionOps.getExecutor()); return future; } - public ListenableFuture> asyncForScala() { - - ResultSetFuture resultSetFuture = sessionOps.executeAsync(options(buildStatement()), showValues); - - ListenableFuture> future = Futures.transform(resultSetFuture, - new Function>() { - - @Override - public scala.collection.immutable.Stream apply(ResultSet resultSet) { - Stream stream = transform(resultSet); - return scala.collection.JavaConversions.asScalaIterator(stream.iterator()).toStream(); - } - - }, sessionOps.getExecutor()); - - return future; - } - - public Future> future() { - return Scala.asFuture(asyncForScala()); - } - - public Future, A>> future(A a) { - return Scala.asFuture(asyncForScala(), a); - } - - public Future, A, B>> future(A a, B b) { - return Scala.asFuture(asyncForScala(), a, b); - } - - public Future, A, B, C>> future(A a, B b, C c) { - return Scala.asFuture(asyncForScala(), a, b, c); - } - - public Future, A, B, C, D>> future(A a, B b, C c, - D d) { - return Scala.asFuture(asyncForScala(), a, b, c, d); - } - } diff --git a/src/main/java/net/helenus/support/Scala.java b/src/main/java/net/helenus/support/Scala.java deleted file mode 100644 index 7486b08..0000000 --- a/src/main/java/net/helenus/support/Scala.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (C) 2015 The Helenus Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.helenus.support; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; - -import scala.concurrent.Future; -import scala.concurrent.impl.Promise.DefaultPromise; - -public final class Scala { - - public static Future asFuture(ListenableFuture future) { - final scala.concurrent.Promise promise = new DefaultPromise(); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(T result) { - promise.success(result); - } - @Override - public void onFailure(Throwable t) { - promise.failure(t); - } - }); - return promise.future(); - } - - public static Future> asFuture(ListenableFuture future, A a) { - final scala.concurrent.Promise> promise = new DefaultPromise>(); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(T result) { - promise.success(new Fun.Tuple2(result, a)); - } - @Override - public void onFailure(Throwable t) { - promise.failure(t); - } - }); - return promise.future(); - } - - public static Future> asFuture(ListenableFuture future, A a, B b) { - final scala.concurrent.Promise> promise = new DefaultPromise>(); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(T result) { - promise.success(new Fun.Tuple3(result, a, b)); - } - @Override - public void onFailure(Throwable t) { - promise.failure(t); - } - }); - return promise.future(); - } - - public static Future> asFuture(ListenableFuture future, A a, B b, C c) { - final scala.concurrent.Promise> promise = new DefaultPromise>(); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(T result) { - promise.success(new Fun.Tuple4(result, a, b, c)); - } - @Override - public void onFailure(Throwable t) { - promise.failure(t); - } - }); - return promise.future(); - } - - public static Future> asFuture(ListenableFuture future, A a, B b, C c, - D d) { - final scala.concurrent.Promise> promise = new DefaultPromise>(); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(T result) { - promise.success(new Fun.Tuple5(result, a, b, c, d)); - } - @Override - public void onFailure(Throwable t) { - promise.failure(t); - } - }); - return promise.future(); - } - -}