diff --git a/helenus-core.iml b/helenus-core.iml index 77f2a38..42b1cc5 100644 --- a/helenus-core.iml +++ b/helenus-core.iml @@ -11,8 +11,6 @@ - - diff --git a/pom.xml b/pom.xml index 89abb41..cd51b84 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 net.helenus helenus-core - 2.0.5-SNAPSHOT + 2.0.6-SNAPSHOT jar helenus diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index 36a5145..6f136b8 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -52,6 +52,8 @@ public abstract class AbstractSessionOperations { abstract public ColumnValuePreparer getValuePreparer(); + abstract public ConsistencyLevel getDefaultConsistencyLevel(); + public PreparedStatement prepare(RegularStatement statement) { try { diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index 15cff2b..df5c6fe 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -46,6 +46,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C private final CodecRegistry registry; private volatile String usingKeyspace; private volatile boolean showCql; + private final ConsistencyLevel defaultConsistencyLevel; private final PrintStream printStream; private final SessionRepository sessionRepository; private final Executor executor; @@ -59,7 +60,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C HelenusSession(Session session, String usingKeyspace, CodecRegistry registry, boolean showCql, PrintStream printStream, SessionRepositoryBuilder sessionRepositoryBuilder, Executor executor, - boolean dropSchemaOnClose) { + boolean dropSchemaOnClose, ConsistencyLevel consistencyLevel) { this.session = session; this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry; this.usingKeyspace = Objects.requireNonNull(usingKeyspace, @@ -69,6 +70,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C this.sessionRepository = sessionRepositoryBuilder.build(); this.executor = executor; this.dropSchemaOnClose = dropSchemaOnClose; + this.defaultConsistencyLevel = consistencyLevel; this.valueProvider = new RowColumnValueProvider(this.sessionRepository); this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository); @@ -134,6 +136,10 @@ public final class HelenusSession extends AbstractSessionOperations implements C return valuePreparer; } + public ConsistencyLevel getDefaultConsistencyLevel() { + return defaultConsistencyLevel; + } + public Metadata getMetadata() { return metadata; } public synchronized UnitOfWork begin() { diff --git a/src/main/java/net/helenus/core/SessionInitializer.java b/src/main/java/net/helenus/core/SessionInitializer.java index 04470f5..21414b4 100644 --- a/src/main/java/net/helenus/core/SessionInitializer.java +++ b/src/main/java/net/helenus/core/SessionInitializer.java @@ -15,19 +15,8 @@ */ package net.helenus.core; -import java.io.IOException; -import java.io.PrintStream; -import java.util.*; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.function.Consumer; -import java.util.stream.Collector; -import java.util.stream.Collectors; -import java.util.stream.Stream; - import com.datastax.driver.core.*; import com.google.common.util.concurrent.MoreExecutors; - import net.helenus.mapping.HelenusEntity; import net.helenus.mapping.HelenusEntityType; import net.helenus.mapping.value.ColumnValuePreparer; @@ -35,235 +24,253 @@ import net.helenus.mapping.value.ColumnValueProvider; import net.helenus.support.HelenusException; import net.helenus.support.PackageUtil; +import java.io.IOException; +import java.io.PrintStream; +import java.util.*; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + public final class SessionInitializer extends AbstractSessionOperations { - private final Session session; - private CodecRegistry registry; - private String usingKeyspace; - private boolean showCql = false; - private PrintStream printStream = System.out; - private Executor executor = MoreExecutors.directExecutor(); + private final Session session; + private CodecRegistry registry; + private String usingKeyspace; + private boolean showCql = false; + private ConsistencyLevel consistencyLevel; + private PrintStream printStream = System.out; + private Executor executor = MoreExecutors.directExecutor(); - private SessionRepositoryBuilder sessionRepository; + private SessionRepositoryBuilder sessionRepository; - private boolean dropUnusedColumns = false; - private boolean dropUnusedIndexes = false; + private boolean dropUnusedColumns = false; + private boolean dropUnusedIndexes = false; - private KeyspaceMetadata keyspaceMetadata; + private KeyspaceMetadata keyspaceMetadata; - private final List initList = new ArrayList(); - private AutoDdl autoDdl = AutoDdl.UPDATE; + private final List initList = new ArrayList(); + private AutoDdl autoDdl = AutoDdl.UPDATE; - SessionInitializer(Session session) { - this.session = Objects.requireNonNull(session, "empty session"); - this.usingKeyspace = session.getLoggedKeyspace(); // can be null - this.sessionRepository = new SessionRepositoryBuilder(session); - } + SessionInitializer(Session session) { + this.session = Objects.requireNonNull(session, "empty session"); + this.usingKeyspace = session.getLoggedKeyspace(); // can be null + this.sessionRepository = new SessionRepositoryBuilder(session); + } - @Override - public Session currentSession() { - return session; - } + @Override + public Session currentSession() { + return session; + } - @Override - public String usingKeyspace() { - return usingKeyspace; - } + @Override + public String usingKeyspace() { + return usingKeyspace; + } - @Override - public Executor getExecutor() { - return executor; - } + @Override + public Executor getExecutor() { + return executor; + } - @Override - public SessionRepository getSessionRepository() { - throw new HelenusException("not expected to call"); - } + @Override + public SessionRepository getSessionRepository() { + throw new HelenusException("not expected to call"); + } - @Override - public ColumnValueProvider getValueProvider() { - throw new HelenusException("not expected to call"); - } + @Override + public ColumnValueProvider getValueProvider() { + throw new HelenusException("not expected to call"); + } - @Override - public ColumnValuePreparer getValuePreparer() { - throw new HelenusException("not expected to call"); - } + @Override + public ColumnValuePreparer getValuePreparer() { + throw new HelenusException("not expected to call"); + } - public SessionInitializer showCql() { - this.showCql = true; - return this; - } + public SessionInitializer showCql() { + this.showCql = true; + return this; + } - public SessionInitializer showCql(boolean enabled) { - this.showCql = enabled; - return this; - } + public SessionInitializer showCql(boolean enabled) { + this.showCql = enabled; + return this; + } - @Override - public PrintStream getPrintStream() { - return printStream; - } + public SessionInitializer consistencyLevel(ConsistencyLevel consistencyLevel) { + this.consistencyLevel = consistencyLevel; + return this; + } - public SessionInitializer printTo(PrintStream out) { - this.printStream = out; - return this; - } + public ConsistencyLevel getDefaultConsistencyLevel() { + return consistencyLevel; + } - public SessionInitializer withExecutor(Executor executor) { - Objects.requireNonNull(executor, "empty executor"); - this.executor = executor; - return this; - } + @Override + public PrintStream getPrintStream() { + return printStream; + } - public SessionInitializer withCachingExecutor() { - this.executor = Executors.newCachedThreadPool(); - return this; - } + public SessionInitializer printTo(PrintStream out) { + this.printStream = out; + return this; + } - public SessionInitializer dropUnusedColumns(boolean enabled) { - this.dropUnusedColumns = enabled; - return this; - } + public SessionInitializer withExecutor(Executor executor) { + Objects.requireNonNull(executor, "empty executor"); + this.executor = executor; + return this; + } - public SessionInitializer dropUnusedIndexes(boolean enabled) { - this.dropUnusedIndexes = enabled; - return this; - } + public SessionInitializer withCachingExecutor() { + this.executor = Executors.newCachedThreadPool(); + return this; + } + + public SessionInitializer dropUnusedColumns(boolean enabled) { + this.dropUnusedColumns = enabled; + return this; + } + + public SessionInitializer dropUnusedIndexes(boolean enabled) { + this.dropUnusedIndexes = enabled; + return this; + } public SessionInitializer withCodecRegistry(CodecRegistry registry) { - this.registry = registry; + this.registry = registry; return this; } @Override - public boolean isShowCql() { - return showCql; - } + public boolean isShowCql() { + return showCql; + } - public SessionInitializer addPackage(String packageName) { - try { - PackageUtil.getClasses(packageName).stream().filter(c -> c.isInterface() && !c.isAnnotation()) - .forEach(initList::add); - } catch (IOException | ClassNotFoundException e) { - throw new HelenusException("fail to add package " + packageName, e); + public SessionInitializer addPackage(String packageName) { + try { + PackageUtil.getClasses(packageName).stream().filter(c -> c.isInterface() && !c.isAnnotation()) + .forEach(initList::add); } - return this; - } + catch (IOException | ClassNotFoundException e) { + throw new HelenusException("fail to add package " + packageName, e); + } + return this; + } - public SessionInitializer add(Object... dsls) { - Objects.requireNonNull(dsls, "dsls is empty"); - int len = dsls.length; - for (int i = 0; i != len; ++i) { - Object obj = Objects.requireNonNull(dsls[i], "element " + i + " is empty"); - initList.add(obj); - } - return this; - } + public SessionInitializer add(Object... dsls) { + Objects.requireNonNull(dsls, "dsls is empty"); + int len = dsls.length; + for (int i = 0; i != len; ++i) { + Object obj = Objects.requireNonNull(dsls[i], "element " + i + " is empty"); + initList.add(obj); + } + return this; + } - public SessionInitializer autoValidate() { - this.autoDdl = AutoDdl.VALIDATE; - return this; - } + public SessionInitializer autoValidate() { + this.autoDdl = AutoDdl.VALIDATE; + return this; + } - public SessionInitializer autoUpdate() { - this.autoDdl = AutoDdl.UPDATE; - return this; - } + public SessionInitializer autoUpdate() { + this.autoDdl = AutoDdl.UPDATE; + return this; + } - public SessionInitializer autoCreate() { - this.autoDdl = AutoDdl.CREATE; - return this; - } + public SessionInitializer autoCreate() { + this.autoDdl = AutoDdl.CREATE; + return this; + } - public SessionInitializer autoCreateDrop() { - this.autoDdl = AutoDdl.CREATE_DROP; - return this; - } + public SessionInitializer autoCreateDrop() { + this.autoDdl = AutoDdl.CREATE_DROP; + return this; + } - public SessionInitializer auto(AutoDdl autoDdl) { - this.autoDdl = autoDdl; - return this; - } + public SessionInitializer auto(AutoDdl autoDdl) { + this.autoDdl = autoDdl; + return this; + } - public SessionInitializer use(String keyspace) { - session.execute(SchemaUtil.use(keyspace, false)); - this.usingKeyspace = keyspace; - return this; - } + public SessionInitializer use(String keyspace) { + session.execute(SchemaUtil.use(keyspace, false)); + this.usingKeyspace = keyspace; + return this; + } - public SessionInitializer use(String keyspace, boolean forceQuote) { - session.execute(SchemaUtil.use(keyspace, forceQuote)); - this.usingKeyspace = keyspace; - return this; - } + public SessionInitializer use(String keyspace, boolean forceQuote) { + session.execute(SchemaUtil.use(keyspace, forceQuote)); + this.usingKeyspace = keyspace; + return this; + } - public void singleton() { + public void singleton() { - Helenus.setSession(get()); - } + Helenus.setSession(get()); + } - public synchronized HelenusSession get() { - initialize(); - return new HelenusSession(session, usingKeyspace, registry, showCql, printStream, sessionRepository, executor, - autoDdl == AutoDdl.CREATE_DROP); - } + public synchronized HelenusSession get() { + initialize(); + return new HelenusSession(session, usingKeyspace, registry, showCql, printStream, sessionRepository, executor, + autoDdl == AutoDdl.CREATE_DROP, consistencyLevel); + } - private void initialize() { + private void initialize() { - Objects.requireNonNull(usingKeyspace, "please define keyspace by 'use' operator"); + Objects.requireNonNull(usingKeyspace, "please define keyspace by 'use' operator"); - initList.forEach(dsl -> sessionRepository.add(dsl)); + initList.forEach(dsl -> sessionRepository.add(dsl)); - TableOperations tableOps = new TableOperations(this, dropUnusedColumns, dropUnusedIndexes); - UserTypeOperations userTypeOps = new UserTypeOperations(this, dropUnusedColumns); + TableOperations tableOps = new TableOperations(this, dropUnusedColumns, dropUnusedIndexes); + UserTypeOperations userTypeOps = new UserTypeOperations(this, dropUnusedColumns); - switch (autoDdl) { + switch (autoDdl) { - case CREATE_DROP : + case CREATE_DROP: - // Drop tables first, otherwise a `DROP TYPE ...` will fail as the type is still referenced - // by a table. - sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE) - .forEach(e -> tableOps.dropTable(e)); + // Drop tables first, otherwise a `DROP TYPE ...` will fail as the type is still referenced + // by a table. + sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE) + .forEach(e -> tableOps.dropTable(e)); - eachUserTypeInReverseOrder(userTypeOps, e -> userTypeOps.dropUserType(e)); + eachUserTypeInReverseOrder(userTypeOps, e -> userTypeOps.dropUserType(e)); - // FALLTHRU to CREATE case (read: the absence of a `break;` statement here is intentional!) - case CREATE : + // FALLTHRU to CREATE case (read: the absence of a `break;` statement here is intentional!) + case CREATE: - eachUserTypeInOrder(userTypeOps, e -> userTypeOps.createUserType(e)); + eachUserTypeInOrder(userTypeOps, e -> userTypeOps.createUserType(e)); - sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE) - .forEach(e -> tableOps.createTable(e)); + sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE) + .forEach(e -> tableOps.createTable(e)); - break; + break; - case VALIDATE : + case VALIDATE: - eachUserTypeInOrder(userTypeOps, e -> userTypeOps.validateUserType(getUserType(e), e)); + eachUserTypeInOrder(userTypeOps, e -> userTypeOps.validateUserType(getUserType(e), e)); - sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE) - .forEach(e -> tableOps.validateTable(getTableMetadata(e), e)); - break; + sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE) + .forEach(e -> tableOps.validateTable(getTableMetadata(e), e)); + break; - case UPDATE : + case UPDATE: - eachUserTypeInOrder(userTypeOps, e -> userTypeOps.updateUserType(getUserType(e), e)); + eachUserTypeInOrder(userTypeOps, e -> userTypeOps.updateUserType(getUserType(e), e)); - sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE) - .forEach(e -> tableOps.updateTable(getTableMetadata(e), e)); - break; + sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE) + .forEach(e -> tableOps.updateTable(getTableMetadata(e), e)); + break; - } + } - KeyspaceMetadata km = getKeyspaceMetadata(); + KeyspaceMetadata km = getKeyspaceMetadata(); - for (UserType userType : km.getUserTypes()) { - sessionRepository.addUserType(userType.getTypeName(), userType); - } + for (UserType userType : km.getUserTypes()) { + sessionRepository.addUserType(userType.getTypeName(), userType); + } - } + } private void eachUserTypeInOrder(UserTypeOperations userTypeOps, Consumer action) { @@ -278,9 +285,11 @@ public final class SessionInitializer extends AbstractSessionOperations { } private void eachUserTypeInReverseOrder(UserTypeOperations userTypeOps, Consumer action) { - ArrayDeque deque = new ArrayDeque<>(); - eachUserTypeInOrder(userTypeOps, e -> deque.addFirst(e)); - deque.stream().forEach(e -> {action.accept(e); }); + ArrayDeque deque = new ArrayDeque<>(); + eachUserTypeInOrder(userTypeOps, e -> deque.addFirst(e)); + deque.stream().forEach(e -> { + action.accept(e); + }); /* Set processedSet = new HashSet(); Set stack = new HashSet(); @@ -297,39 +306,39 @@ public final class SessionInitializer extends AbstractSessionOperations { } private void eachUserTypeInRecursion(HelenusEntity e, Set processedSet, Set stack, - UserTypeOperations userTypeOps, Consumer action) { + UserTypeOperations userTypeOps, Consumer action) { - stack.add(e); + stack.add(e); - Collection createBefore = sessionRepository.getUserTypeUses(e); + Collection createBefore = sessionRepository.getUserTypeUses(e); - for (HelenusEntity be : createBefore) { - if (!processedSet.contains(be) && !stack.contains(be)) { - eachUserTypeInRecursion(be, processedSet, stack, userTypeOps, action); - processedSet.add(be); - } - } + for (HelenusEntity be : createBefore) { + if (!processedSet.contains(be) && !stack.contains(be)) { + eachUserTypeInRecursion(be, processedSet, stack, userTypeOps, action); + processedSet.add(be); + } + } - if (!processedSet.contains(e)) { - action.accept(e); - processedSet.add(e); - } + if (!processedSet.contains(e)) { + action.accept(e); + processedSet.add(e); + } - } + } - private KeyspaceMetadata getKeyspaceMetadata() { - if (keyspaceMetadata == null) { - keyspaceMetadata = session.getCluster().getMetadata().getKeyspace(usingKeyspace.toLowerCase()); - } - return keyspaceMetadata; - } + private KeyspaceMetadata getKeyspaceMetadata() { + if (keyspaceMetadata == null) { + keyspaceMetadata = session.getCluster().getMetadata().getKeyspace(usingKeyspace.toLowerCase()); + } + return keyspaceMetadata; + } - private TableMetadata getTableMetadata(HelenusEntity entity) { - return getKeyspaceMetadata().getTable(entity.getName().getName()); + private TableMetadata getTableMetadata(HelenusEntity entity) { + return getKeyspaceMetadata().getTable(entity.getName().getName()); - } + } - private UserType getUserType(HelenusEntity entity) { - return getKeyspaceMetadata().getUserType(entity.getName().getName()); - } + private UserType getUserType(HelenusEntity entity) { + return getKeyspaceMetadata().getUserType(entity.getName().getName()); + } } diff --git a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java index c15b372..7b75cae 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java @@ -52,6 +52,7 @@ public abstract class AbstractStatementOperation