Add the LOCAL_* consistency levels. Add a way to provide a default consistency level for all session operations.

This commit is contained in:
Greg Burd 2017-08-02 12:54:01 -04:00
parent 18cfc85f45
commit b44c898682
6 changed files with 261 additions and 220 deletions

View file

@ -11,8 +11,6 @@
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-core:3.2.2" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.github.ben-manes.caffeine:caffeine:2.2.6" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-library:2.13.0-M1" level="project" />
<orderEntry type="library" name="Maven: com.datastax.cassandra:cassandra-driver-core:3.3.0" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty-handler:4.0.47.Final" level="project" />

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>net.helenus</groupId>
<artifactId>helenus-core</artifactId>
<version>2.0.5-SNAPSHOT</version>
<version>2.0.6-SNAPSHOT</version>
<packaging>jar</packaging>
<name>helenus</name>

View file

@ -52,6 +52,8 @@ public abstract class AbstractSessionOperations {
abstract public ColumnValuePreparer getValuePreparer();
abstract public ConsistencyLevel getDefaultConsistencyLevel();
public PreparedStatement prepare(RegularStatement statement) {
try {

View file

@ -46,6 +46,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
private final CodecRegistry registry;
private volatile String usingKeyspace;
private volatile boolean showCql;
private final ConsistencyLevel defaultConsistencyLevel;
private final PrintStream printStream;
private final SessionRepository sessionRepository;
private final Executor executor;
@ -59,7 +60,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
HelenusSession(Session session, String usingKeyspace, CodecRegistry registry, boolean showCql,
PrintStream printStream, SessionRepositoryBuilder sessionRepositoryBuilder, Executor executor,
boolean dropSchemaOnClose) {
boolean dropSchemaOnClose, ConsistencyLevel consistencyLevel) {
this.session = session;
this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry;
this.usingKeyspace = Objects.requireNonNull(usingKeyspace,
@ -69,6 +70,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
this.sessionRepository = sessionRepositoryBuilder.build();
this.executor = executor;
this.dropSchemaOnClose = dropSchemaOnClose;
this.defaultConsistencyLevel = consistencyLevel;
this.valueProvider = new RowColumnValueProvider(this.sessionRepository);
this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository);
@ -134,6 +136,10 @@ public final class HelenusSession extends AbstractSessionOperations implements C
return valuePreparer;
}
public ConsistencyLevel getDefaultConsistencyLevel() {
return defaultConsistencyLevel;
}
public Metadata getMetadata() { return metadata; }
public synchronized UnitOfWork begin() {

View file

@ -15,19 +15,8 @@
*/
package net.helenus.core;
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.datastax.driver.core.*;
import com.google.common.util.concurrent.MoreExecutors;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusEntityType;
import net.helenus.mapping.value.ColumnValuePreparer;
@ -35,235 +24,253 @@ import net.helenus.mapping.value.ColumnValueProvider;
import net.helenus.support.HelenusException;
import net.helenus.support.PackageUtil;
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
public final class SessionInitializer extends AbstractSessionOperations {
private final Session session;
private CodecRegistry registry;
private String usingKeyspace;
private boolean showCql = false;
private PrintStream printStream = System.out;
private Executor executor = MoreExecutors.directExecutor();
private final Session session;
private CodecRegistry registry;
private String usingKeyspace;
private boolean showCql = false;
private ConsistencyLevel consistencyLevel;
private PrintStream printStream = System.out;
private Executor executor = MoreExecutors.directExecutor();
private SessionRepositoryBuilder sessionRepository;
private SessionRepositoryBuilder sessionRepository;
private boolean dropUnusedColumns = false;
private boolean dropUnusedIndexes = false;
private boolean dropUnusedColumns = false;
private boolean dropUnusedIndexes = false;
private KeyspaceMetadata keyspaceMetadata;
private KeyspaceMetadata keyspaceMetadata;
private final List<Object> initList = new ArrayList<Object>();
private AutoDdl autoDdl = AutoDdl.UPDATE;
private final List<Object> initList = new ArrayList<Object>();
private AutoDdl autoDdl = AutoDdl.UPDATE;
SessionInitializer(Session session) {
this.session = Objects.requireNonNull(session, "empty session");
this.usingKeyspace = session.getLoggedKeyspace(); // can be null
this.sessionRepository = new SessionRepositoryBuilder(session);
}
SessionInitializer(Session session) {
this.session = Objects.requireNonNull(session, "empty session");
this.usingKeyspace = session.getLoggedKeyspace(); // can be null
this.sessionRepository = new SessionRepositoryBuilder(session);
}
@Override
public Session currentSession() {
return session;
}
@Override
public Session currentSession() {
return session;
}
@Override
public String usingKeyspace() {
return usingKeyspace;
}
@Override
public String usingKeyspace() {
return usingKeyspace;
}
@Override
public Executor getExecutor() {
return executor;
}
@Override
public Executor getExecutor() {
return executor;
}
@Override
public SessionRepository getSessionRepository() {
throw new HelenusException("not expected to call");
}
@Override
public SessionRepository getSessionRepository() {
throw new HelenusException("not expected to call");
}
@Override
public ColumnValueProvider getValueProvider() {
throw new HelenusException("not expected to call");
}
@Override
public ColumnValueProvider getValueProvider() {
throw new HelenusException("not expected to call");
}
@Override
public ColumnValuePreparer getValuePreparer() {
throw new HelenusException("not expected to call");
}
@Override
public ColumnValuePreparer getValuePreparer() {
throw new HelenusException("not expected to call");
}
public SessionInitializer showCql() {
this.showCql = true;
return this;
}
public SessionInitializer showCql() {
this.showCql = true;
return this;
}
public SessionInitializer showCql(boolean enabled) {
this.showCql = enabled;
return this;
}
public SessionInitializer showCql(boolean enabled) {
this.showCql = enabled;
return this;
}
@Override
public PrintStream getPrintStream() {
return printStream;
}
public SessionInitializer consistencyLevel(ConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}
public SessionInitializer printTo(PrintStream out) {
this.printStream = out;
return this;
}
public ConsistencyLevel getDefaultConsistencyLevel() {
return consistencyLevel;
}
public SessionInitializer withExecutor(Executor executor) {
Objects.requireNonNull(executor, "empty executor");
this.executor = executor;
return this;
}
@Override
public PrintStream getPrintStream() {
return printStream;
}
public SessionInitializer withCachingExecutor() {
this.executor = Executors.newCachedThreadPool();
return this;
}
public SessionInitializer printTo(PrintStream out) {
this.printStream = out;
return this;
}
public SessionInitializer dropUnusedColumns(boolean enabled) {
this.dropUnusedColumns = enabled;
return this;
}
public SessionInitializer withExecutor(Executor executor) {
Objects.requireNonNull(executor, "empty executor");
this.executor = executor;
return this;
}
public SessionInitializer dropUnusedIndexes(boolean enabled) {
this.dropUnusedIndexes = enabled;
return this;
}
public SessionInitializer withCachingExecutor() {
this.executor = Executors.newCachedThreadPool();
return this;
}
public SessionInitializer dropUnusedColumns(boolean enabled) {
this.dropUnusedColumns = enabled;
return this;
}
public SessionInitializer dropUnusedIndexes(boolean enabled) {
this.dropUnusedIndexes = enabled;
return this;
}
public SessionInitializer withCodecRegistry(CodecRegistry registry) {
this.registry = registry;
this.registry = registry;
return this;
}
@Override
public boolean isShowCql() {
return showCql;
}
public boolean isShowCql() {
return showCql;
}
public SessionInitializer addPackage(String packageName) {
try {
PackageUtil.getClasses(packageName).stream().filter(c -> c.isInterface() && !c.isAnnotation())
.forEach(initList::add);
} catch (IOException | ClassNotFoundException e) {
throw new HelenusException("fail to add package " + packageName, e);
public SessionInitializer addPackage(String packageName) {
try {
PackageUtil.getClasses(packageName).stream().filter(c -> c.isInterface() && !c.isAnnotation())
.forEach(initList::add);
}
return this;
}
catch (IOException | ClassNotFoundException e) {
throw new HelenusException("fail to add package " + packageName, e);
}
return this;
}
public SessionInitializer add(Object... dsls) {
Objects.requireNonNull(dsls, "dsls is empty");
int len = dsls.length;
for (int i = 0; i != len; ++i) {
Object obj = Objects.requireNonNull(dsls[i], "element " + i + " is empty");
initList.add(obj);
}
return this;
}
public SessionInitializer add(Object... dsls) {
Objects.requireNonNull(dsls, "dsls is empty");
int len = dsls.length;
for (int i = 0; i != len; ++i) {
Object obj = Objects.requireNonNull(dsls[i], "element " + i + " is empty");
initList.add(obj);
}
return this;
}
public SessionInitializer autoValidate() {
this.autoDdl = AutoDdl.VALIDATE;
return this;
}
public SessionInitializer autoValidate() {
this.autoDdl = AutoDdl.VALIDATE;
return this;
}
public SessionInitializer autoUpdate() {
this.autoDdl = AutoDdl.UPDATE;
return this;
}
public SessionInitializer autoUpdate() {
this.autoDdl = AutoDdl.UPDATE;
return this;
}
public SessionInitializer autoCreate() {
this.autoDdl = AutoDdl.CREATE;
return this;
}
public SessionInitializer autoCreate() {
this.autoDdl = AutoDdl.CREATE;
return this;
}
public SessionInitializer autoCreateDrop() {
this.autoDdl = AutoDdl.CREATE_DROP;
return this;
}
public SessionInitializer autoCreateDrop() {
this.autoDdl = AutoDdl.CREATE_DROP;
return this;
}
public SessionInitializer auto(AutoDdl autoDdl) {
this.autoDdl = autoDdl;
return this;
}
public SessionInitializer auto(AutoDdl autoDdl) {
this.autoDdl = autoDdl;
return this;
}
public SessionInitializer use(String keyspace) {
session.execute(SchemaUtil.use(keyspace, false));
this.usingKeyspace = keyspace;
return this;
}
public SessionInitializer use(String keyspace) {
session.execute(SchemaUtil.use(keyspace, false));
this.usingKeyspace = keyspace;
return this;
}
public SessionInitializer use(String keyspace, boolean forceQuote) {
session.execute(SchemaUtil.use(keyspace, forceQuote));
this.usingKeyspace = keyspace;
return this;
}
public SessionInitializer use(String keyspace, boolean forceQuote) {
session.execute(SchemaUtil.use(keyspace, forceQuote));
this.usingKeyspace = keyspace;
return this;
}
public void singleton() {
public void singleton() {
Helenus.setSession(get());
}
Helenus.setSession(get());
}
public synchronized HelenusSession get() {
initialize();
return new HelenusSession(session, usingKeyspace, registry, showCql, printStream, sessionRepository, executor,
autoDdl == AutoDdl.CREATE_DROP);
}
public synchronized HelenusSession get() {
initialize();
return new HelenusSession(session, usingKeyspace, registry, showCql, printStream, sessionRepository, executor,
autoDdl == AutoDdl.CREATE_DROP, consistencyLevel);
}
private void initialize() {
private void initialize() {
Objects.requireNonNull(usingKeyspace, "please define keyspace by 'use' operator");
Objects.requireNonNull(usingKeyspace, "please define keyspace by 'use' operator");
initList.forEach(dsl -> sessionRepository.add(dsl));
initList.forEach(dsl -> sessionRepository.add(dsl));
TableOperations tableOps = new TableOperations(this, dropUnusedColumns, dropUnusedIndexes);
UserTypeOperations userTypeOps = new UserTypeOperations(this, dropUnusedColumns);
TableOperations tableOps = new TableOperations(this, dropUnusedColumns, dropUnusedIndexes);
UserTypeOperations userTypeOps = new UserTypeOperations(this, dropUnusedColumns);
switch (autoDdl) {
switch (autoDdl) {
case CREATE_DROP :
case CREATE_DROP:
// Drop tables first, otherwise a `DROP TYPE ...` will fail as the type is still referenced
// by a table.
sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE)
.forEach(e -> tableOps.dropTable(e));
// Drop tables first, otherwise a `DROP TYPE ...` will fail as the type is still referenced
// by a table.
sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE)
.forEach(e -> tableOps.dropTable(e));
eachUserTypeInReverseOrder(userTypeOps, e -> userTypeOps.dropUserType(e));
eachUserTypeInReverseOrder(userTypeOps, e -> userTypeOps.dropUserType(e));
// FALLTHRU to CREATE case (read: the absence of a `break;` statement here is intentional!)
case CREATE :
// FALLTHRU to CREATE case (read: the absence of a `break;` statement here is intentional!)
case CREATE:
eachUserTypeInOrder(userTypeOps, e -> userTypeOps.createUserType(e));
eachUserTypeInOrder(userTypeOps, e -> userTypeOps.createUserType(e));
sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE)
.forEach(e -> tableOps.createTable(e));
sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE)
.forEach(e -> tableOps.createTable(e));
break;
break;
case VALIDATE :
case VALIDATE:
eachUserTypeInOrder(userTypeOps, e -> userTypeOps.validateUserType(getUserType(e), e));
eachUserTypeInOrder(userTypeOps, e -> userTypeOps.validateUserType(getUserType(e), e));
sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE)
.forEach(e -> tableOps.validateTable(getTableMetadata(e), e));
break;
sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE)
.forEach(e -> tableOps.validateTable(getTableMetadata(e), e));
break;
case UPDATE :
case UPDATE:
eachUserTypeInOrder(userTypeOps, e -> userTypeOps.updateUserType(getUserType(e), e));
eachUserTypeInOrder(userTypeOps, e -> userTypeOps.updateUserType(getUserType(e), e));
sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE)
.forEach(e -> tableOps.updateTable(getTableMetadata(e), e));
break;
sessionRepository.entities().stream().filter(e -> e.getType() == HelenusEntityType.TABLE)
.forEach(e -> tableOps.updateTable(getTableMetadata(e), e));
break;
}
}
KeyspaceMetadata km = getKeyspaceMetadata();
KeyspaceMetadata km = getKeyspaceMetadata();
for (UserType userType : km.getUserTypes()) {
sessionRepository.addUserType(userType.getTypeName(), userType);
}
for (UserType userType : km.getUserTypes()) {
sessionRepository.addUserType(userType.getTypeName(), userType);
}
}
}
private void eachUserTypeInOrder(UserTypeOperations userTypeOps, Consumer<? super HelenusEntity> action) {
@ -278,9 +285,11 @@ public final class SessionInitializer extends AbstractSessionOperations {
}
private void eachUserTypeInReverseOrder(UserTypeOperations userTypeOps, Consumer<? super HelenusEntity> action) {
ArrayDeque<HelenusEntity> deque = new ArrayDeque<>();
eachUserTypeInOrder(userTypeOps, e -> deque.addFirst(e));
deque.stream().forEach(e -> {action.accept(e); });
ArrayDeque<HelenusEntity> deque = new ArrayDeque<>();
eachUserTypeInOrder(userTypeOps, e -> deque.addFirst(e));
deque.stream().forEach(e -> {
action.accept(e);
});
/*
Set<HelenusEntity> processedSet = new HashSet<HelenusEntity>();
Set<HelenusEntity> stack = new HashSet<HelenusEntity>();
@ -297,39 +306,39 @@ public final class SessionInitializer extends AbstractSessionOperations {
}
private void eachUserTypeInRecursion(HelenusEntity e, Set<HelenusEntity> processedSet, Set<HelenusEntity> stack,
UserTypeOperations userTypeOps, Consumer<? super HelenusEntity> action) {
UserTypeOperations userTypeOps, Consumer<? super HelenusEntity> action) {
stack.add(e);
stack.add(e);
Collection<HelenusEntity> createBefore = sessionRepository.getUserTypeUses(e);
Collection<HelenusEntity> createBefore = sessionRepository.getUserTypeUses(e);
for (HelenusEntity be : createBefore) {
if (!processedSet.contains(be) && !stack.contains(be)) {
eachUserTypeInRecursion(be, processedSet, stack, userTypeOps, action);
processedSet.add(be);
}
}
for (HelenusEntity be : createBefore) {
if (!processedSet.contains(be) && !stack.contains(be)) {
eachUserTypeInRecursion(be, processedSet, stack, userTypeOps, action);
processedSet.add(be);
}
}
if (!processedSet.contains(e)) {
action.accept(e);
processedSet.add(e);
}
if (!processedSet.contains(e)) {
action.accept(e);
processedSet.add(e);
}
}
}
private KeyspaceMetadata getKeyspaceMetadata() {
if (keyspaceMetadata == null) {
keyspaceMetadata = session.getCluster().getMetadata().getKeyspace(usingKeyspace.toLowerCase());
}
return keyspaceMetadata;
}
private KeyspaceMetadata getKeyspaceMetadata() {
if (keyspaceMetadata == null) {
keyspaceMetadata = session.getCluster().getMetadata().getKeyspace(usingKeyspace.toLowerCase());
}
return keyspaceMetadata;
}
private TableMetadata getTableMetadata(HelenusEntity entity) {
return getKeyspaceMetadata().getTable(entity.getName().getName());
private TableMetadata getTableMetadata(HelenusEntity entity) {
return getKeyspaceMetadata().getTable(entity.getName().getName());
}
}
private UserType getUserType(HelenusEntity entity) {
return getKeyspaceMetadata().getUserType(entity.getName().getName());
}
private UserType getUserType(HelenusEntity entity) {
return getKeyspaceMetadata().getUserType(entity.getName().getName());
}
}

View file

@ -52,6 +52,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
public AbstractStatementOperation(AbstractSessionOperations sessionOperations) {
this.sessionOps = sessionOperations;
this.consistencyLevel = sessionOperations.getDefaultConsistencyLevel();
}
public O showValues(boolean enabled) {
@ -100,17 +101,32 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
return (O) this;
}
public O consistencyQuorum() {
this.consistencyLevel = ConsistencyLevel.QUORUM;
return (O) this;
}
public O consistencyQuorum() {
this.consistencyLevel = ConsistencyLevel.QUORUM;
return (O) this;
}
public O consistencyAll() {
this.consistencyLevel = ConsistencyLevel.ALL;
return (O) this;
}
public O serialConsistency(ConsistencyLevel level) {
public O consistencyLocalOne() {
this.consistencyLevel = ConsistencyLevel.LOCAL_ONE;
return (O) this;
}
public O consistencyLocalQuorum() {
this.consistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
return (O) this;
}
public O consistencyEachQuorum() {
this.consistencyLevel = ConsistencyLevel.EACH_QUORUM;
return (O) this;
}
public O serialConsistency(ConsistencyLevel level) {
this.serialConsistencyLevel = level;
return (O) this;
}
@ -130,10 +146,20 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
return (O) this;
}
public O serialConsistencyAll() {
this.serialConsistencyLevel = ConsistencyLevel.ALL;
return (O) this;
}
public O serialConsistencyAll() {
this.serialConsistencyLevel = ConsistencyLevel.ALL;
return (O) this;
}
public O serialConsistencyLocal() {
this.serialConsistencyLevel = ConsistencyLevel.LOCAL_SERIAL;
return (O) this;
}
public O serialConsistencyLocalQuorum() {
this.serialConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM;
return (O) this;
}
public O disableTracing() {
this.enableTracing = false;