WIP: started with wal idea, shifting to accounting table.

This commit is contained in:
Greg Burd 2017-11-09 12:05:09 -05:00
parent b4dca9c710
commit ad9261fda3
9 changed files with 209 additions and 26 deletions

View file

@ -149,4 +149,7 @@ public abstract class AbstractSessionOperations {
} }
public void cacheEvict(List<Facet> facets) {} public void cacheEvict(List<Facet> facets) {}
public boolean obscureValuesInQueries() { return false; }
} }

View file

@ -15,19 +15,12 @@
*/ */
package net.helenus.core; package net.helenus.core;
import static net.helenus.core.HelenusSession.deleted; import com.datastax.driver.core.utils.UUIDs;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ResultSet;
import com.diffplug.common.base.Errors; import com.diffplug.common.base.Errors;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.HashBasedTable; import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table; import com.google.common.collect.Table;
import com.google.common.collect.TreeTraverser; import com.google.common.collect.TreeTraverser;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet; import net.helenus.core.cache.Facet;
import net.helenus.core.operation.AbstractOperation; import net.helenus.core.operation.AbstractOperation;
@ -36,6 +29,16 @@ import net.helenus.support.Either;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static net.helenus.core.HelenusSession.deleted;
import static net.helenus.core.Query.gte;
import static net.helenus.core.Query.lt;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */ /** Encapsulates the concept of a "transaction" as a unit-of-work. */
public abstract class AbstractUnitOfWork<E extends Exception> public abstract class AbstractUnitOfWork<E extends Exception>
implements UnitOfWork<E>, AutoCloseable { implements UnitOfWork<E>, AutoCloseable {
@ -45,6 +48,7 @@ public abstract class AbstractUnitOfWork<E extends Exception>
private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>(); private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>();
private final HelenusSession session; private final HelenusSession session;
private final AbstractUnitOfWork<E> parent; private final AbstractUnitOfWork<E> parent;
private final UUID id;
private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create(); private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create();
protected String purpose; protected String purpose;
protected List<String> nestedPurposes = new ArrayList<String>(); protected List<String> nestedPurposes = new ArrayList<String>();
@ -60,12 +64,16 @@ public abstract class AbstractUnitOfWork<E extends Exception>
private boolean committed = false; private boolean committed = false;
private long committedAt = 0L; private long committedAt = 0L;
private BatchOperation batch; private BatchOperation batch;
private WriteAheadLog wal;
private long oldestInFlight;
protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork<E> parent) { protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork<E> parent) {
Objects.requireNonNull(session, "containing session cannot be null"); Objects.requireNonNull(session, "containing session cannot be null");
this.session = session; this.session = session;
this.parent = parent; this.parent = parent;
this.id = UUIDs.random();
this.wal = Helenus.dsl(WriteAheadLog.class);
} }
@Override @Override
@ -95,7 +103,25 @@ public abstract class AbstractUnitOfWork<E extends Exception>
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
elapsedTime = Stopwatch.createStarted(); elapsedTime = Stopwatch.createStarted();
} }
// log.record(txn::start)
UUID seq = UUIDs.timeBased();
try {
session.upsert(WriteAheadLog.class)
.value(wal::lsn, seq)
.value(wal::uow, id)
.value(wal::type, WriteAheadLog.Type.BEGIN)
.usingTimestamp(seq.timestamp())
.consistencyQuorum()
.sync();
} catch(TimeoutException e) {
aborted = true;
}
/* TODO(gburd): find oldest BEGIN record between: time[now] and time[now] - txn timeout (e.g. 5m ago)
that doesn't have a corresponding COMMITTED or ABORTED record. Save that UUID for later. */
oldestInFlight = seq.timestamp() - (5 * 60 * 1000 * 1000 * 100);
Date d = new Date(oldestInFlight);
return this; return this;
} }
@ -292,7 +318,6 @@ public abstract class AbstractUnitOfWork<E extends Exception>
if (batch != null) { if (batch != null) {
committedAt = batch.sync(this); committedAt = batch.sync(this);
//TODO(gburd) update cache with writeTime...
} }
// All nested UnitOfWork should be committed (not aborted) before calls to // All nested UnitOfWork should be committed (not aborted) before calls to
@ -306,10 +331,48 @@ public abstract class AbstractUnitOfWork<E extends Exception>
} }
} }
// log.record(txn::provisionalCommit) // 1. log.record(txn::provisionalCommit)
UUID seqPrep = UUIDs.timeBased();
try {
UUID seq = seqPrep;
session.upsert(WriteAheadLog.class)
.value(wal::lsn, seq)
.value(wal::uow, id)
.value(wal::type, WriteAheadLog.Type.PREPARED)
.usingTimestamp(seq.timestamp())
.consistencyQuorum()
.sync();
} catch (TimeoutException e) {
canCommit = false;
aborted = true;
}
// 2. fetch log records between oldest in-flight txn begin and
Stream<WriteAheadLog> recs = session.select(WriteAheadLog.class)
.where(wal::lsn, gte(oldestInFlight))
.and(wal::lsn, lt(seqPrep))
.consistencyQuorum()
.sync();
// examine log for conflicts in read-set and write-set between begin and // examine log for conflicts in read-set and write-set between begin and
// provisional commit // provisional commit
// if (conflict) { throw new ConflictingUnitOfWorkException(this) } // if (conflict) { throw new ConflictingUnitOfWorkException(this) }
UUID seqCommit = UUIDs.timeBased();
try {
UUID seq = seqCommit;
session.upsert(WriteAheadLog.class)
.value(wal::lsn, seq)
.value(wal::uow, id)
.value(wal::type, WriteAheadLog.Type.COMMITTED)
.usingTimestamp(seq.timestamp())
.consistencyQuorum()
.sync();
} catch (TimeoutException e) {
canCommit = false;
aborted = true;
}
// else return function so as to enable commit.andThen(() -> { do something iff // else return function so as to enable commit.andThen(() -> { do something iff
// commit was successful; }) // commit was successful; })

View file

@ -72,6 +72,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
private final RowColumnValueProvider valueProvider; private final RowColumnValueProvider valueProvider;
private final StatementColumnValuePreparer valuePreparer; private final StatementColumnValuePreparer valuePreparer;
private final Metadata metadata; private final Metadata metadata;
private final boolean obscureValuesInQueries;
private volatile String usingKeyspace; private volatile String usingKeyspace;
private volatile boolean showCql; private volatile boolean showCql;
@ -80,6 +81,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
String usingKeyspace, String usingKeyspace,
CodecRegistry registry, CodecRegistry registry,
boolean showCql, boolean showCql,
boolean obscureValues,
PrintStream printStream, PrintStream printStream,
SessionRepositoryBuilder sessionRepositoryBuilder, SessionRepositoryBuilder sessionRepositoryBuilder,
Executor executor, Executor executor,
@ -96,6 +98,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
Objects.requireNonNull( Objects.requireNonNull(
usingKeyspace, "keyspace needs to be selected before creating session"); usingKeyspace, "keyspace needs to be selected before creating session");
this.showCql = showCql; this.showCql = showCql;
this.obscureValuesInQueries = obscureValues;
this.printStream = printStream; this.printStream = printStream;
this.sessionRepository = sessionRepositoryBuilder.build(); this.sessionRepository = sessionRepositoryBuilder.build();
this.executor = executor; this.executor = executor;
@ -133,6 +136,9 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
return this; return this;
} }
@Override
public boolean obscureValuesInQueries() { return obscureValuesInQueries; }
@Override @Override
public boolean isShowCql() { public boolean isShowCql() {
return showCql; return showCql;

View file

@ -25,6 +25,8 @@ import java.util.*;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.function.Consumer; import java.util.function.Consumer;
import net.helenus.core.annotation.Transactional;
import net.helenus.core.cache.SessionCache; import net.helenus.core.cache.SessionCache;
import net.helenus.core.reflect.DslExportable; import net.helenus.core.reflect.DslExportable;
import net.helenus.mapping.HelenusEntity; import net.helenus.mapping.HelenusEntity;
@ -56,6 +58,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
private KeyspaceMetadata keyspaceMetadata; private KeyspaceMetadata keyspaceMetadata;
private AutoDdl autoDdl = AutoDdl.UPDATE; private AutoDdl autoDdl = AutoDdl.UPDATE;
private SessionCache sessionCache = null; private SessionCache sessionCache = null;
private boolean obscureValues = true;
SessionInitializer(Session session) { SessionInitializer(Session session) {
this.session = Objects.requireNonNull(session, "empty session"); this.session = Objects.requireNonNull(session, "empty session");
@ -103,6 +106,16 @@ public final class SessionInitializer extends AbstractSessionOperations {
return this; return this;
} }
public SessionInitializer obscureValuesWhenLoggingQueries() {
this.obscureValues = true;
return this;
}
public SessionInitializer obscureValuesWhenLoggingQueries(boolean obscureValues) {
this.obscureValues = obscureValues;
return this;
}
public SessionInitializer metricRegistry(MetricRegistry metricRegistry) { public SessionInitializer metricRegistry(MetricRegistry metricRegistry) {
this.metricRegistry = metricRegistry; this.metricRegistry = metricRegistry;
return this; return this;
@ -255,6 +268,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
usingKeyspace, usingKeyspace,
registry, registry,
showCql, showCql,
obscureValues,
printStream, printStream,
sessionRepository, sessionRepository,
executor, executor,
@ -271,19 +285,30 @@ public final class SessionInitializer extends AbstractSessionOperations {
Objects.requireNonNull(usingKeyspace, "please define keyspace by 'use' operator"); Objects.requireNonNull(usingKeyspace, "please define keyspace by 'use' operator");
initList.forEach( boolean transactionalEntities = false;
(either) -> { HelenusEntity walEntity = null;
Class<?> iface = null;
if (either.isLeft()) { for (Either<Object, Class<?>> either : initList) {
iface = MappingUtil.getMappingInterface(either.getLeft()); Class<?> iface = null;
} else { if (either.isLeft()) {
iface = either.getRight(); iface = MappingUtil.getMappingInterface(either.getLeft());
} } else {
iface = either.getRight();
}
DslExportable dsl = (DslExportable) Helenus.dsl(iface);
dsl.setCassandraMetadataForHelenusSession(session.getCluster().getMetadata());
sessionRepository.add(dsl);
if (!transactionalEntities && iface.getDeclaredAnnotation(Transactional.class) != null) {
transactionalEntities = true;
dsl = (DslExportable) Helenus.dsl(WriteAheadLog.class);
dsl.setCassandraMetadataForHelenusSession(session.getCluster().getMetadata());
walEntity = dsl.getHelenusMappingEntity();
sessionRepository.add(dsl);
}
}
DslExportable dsl = (DslExportable) Helenus.dsl(iface);
dsl.setCassandraMetadataForHelenusSession(session.getCluster().getMetadata());
sessionRepository.add(dsl);
});
TableOperations tableOps = new TableOperations(this, dropUnusedColumns, dropUnusedIndexes); TableOperations tableOps = new TableOperations(this, dropUnusedColumns, dropUnusedIndexes);
UserTypeOperations userTypeOps = new UserTypeOperations(this, dropUnusedColumns); UserTypeOperations userTypeOps = new UserTypeOperations(this, dropUnusedColumns);
@ -311,6 +336,10 @@ public final class SessionInitializer extends AbstractSessionOperations {
eachUserTypeInReverseOrder(userTypeOps, e -> userTypeOps.dropUserType(e)); eachUserTypeInReverseOrder(userTypeOps, e -> userTypeOps.dropUserType(e));
if (transactionalEntities) {
tableOps.dropTable(walEntity);
}
// FALLTHRU to CREATE case (read: the absence of a `break;` statement here is // FALLTHRU to CREATE case (read: the absence of a `break;` statement here is
// intentional!) // intentional!)
case CREATE: case CREATE:
@ -328,6 +357,9 @@ public final class SessionInitializer extends AbstractSessionOperations {
.filter(e -> e.getType() == HelenusEntityType.VIEW) .filter(e -> e.getType() == HelenusEntityType.VIEW)
.forEach(e -> tableOps.createView(e)); .forEach(e -> tableOps.createView(e));
if (transactionalEntities) {
tableOps.createTable(walEntity);
}
break; break;
case VALIDATE: case VALIDATE:
@ -339,6 +371,9 @@ public final class SessionInitializer extends AbstractSessionOperations {
.filter(e -> e.getType() == HelenusEntityType.TABLE) .filter(e -> e.getType() == HelenusEntityType.TABLE)
.forEach(e -> tableOps.validateTable(getTableMetadata(e), e)); .forEach(e -> tableOps.validateTable(getTableMetadata(e), e));
if (transactionalEntities) {
tableOps.validateTable(getTableMetadata(walEntity), walEntity);
}
break; break;
case UPDATE: case UPDATE:
@ -361,6 +396,10 @@ public final class SessionInitializer extends AbstractSessionOperations {
.stream() .stream()
.filter(e -> e.getType() == HelenusEntityType.VIEW) .filter(e -> e.getType() == HelenusEntityType.VIEW)
.forEach(e -> tableOps.createView(e)); .forEach(e -> tableOps.createView(e));
if (transactionalEntities) {
tableOps.updateTable(getTableMetadata(walEntity), walEntity);
}
break; break;
} }

View file

@ -0,0 +1,44 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core;
import com.datastax.driver.core.DataType;
import net.helenus.mapping.annotation.*;
import java.util.Date;
import java.util.Set;
import java.util.UUID;
@Table(value="wal")
public interface WriteAheadLog {
enum Type { BEGIN, PREPARED, COMMITTED, ABORTED }
@PartitionKey
@Types.Timestamp
Date lsn();
@Column(value="id")
UUID uow();
@Column
default Type type() { return Type.BEGIN; }
@Column
@Index
@Types.Set(DataType.Name.TEXT)
Set<String> mutations();
}

View file

@ -0,0 +1,25 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Transactional {}

View file

@ -144,12 +144,12 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
@Override @Override
public BuiltStatement buildStatement(boolean cached) { public BuiltStatement buildStatement(boolean cached) {
List<HelenusEntity> entities = values.stream().map(t -> t._1.getProperty().getEntity()).distinct().collect(Collectors.toList()); List<Class<?>> entities = values.stream().map(t -> t._1.getProperty().getEntity().getMappingInterface()).distinct().collect(Collectors.toList());
if (entities.size() != 1) { if (entities.size() != 1) {
throw new HelenusMappingException("you can insert only single entity at a time, found: " throw new HelenusMappingException("you can insert only single entity at a time, found: "
+ entities.stream().map(e -> e.getMappingInterface().toString()).collect(Collectors.joining(", "))); + entities.stream().map(e -> e.toString()).collect(Collectors.joining(", ")));
} }
HelenusEntity entity = entities.get(0); HelenusEntity entity = values.get(0)._1.getEntity();
if (this.entity != null) { if (this.entity != null) {
if (this.entity != entity) { if (this.entity != entity) {
throw new HelenusMappingException("you can insert only single entity at a time, found: " + throw new HelenusMappingException("you can insert only single entity at a time, found: " +

View file

@ -56,6 +56,7 @@ public abstract class Operation<E> {
Operation(AbstractSessionOperations sessionOperations) { Operation(AbstractSessionOperations sessionOperations) {
this.sessionOps = sessionOperations; this.sessionOps = sessionOperations;
this.showValues = !sessionOps.obscureValuesInQueries();
MetricRegistry metrics = sessionOperations.getMetricRegistry(); MetricRegistry metrics = sessionOperations.getMetricRegistry();
if (metrics == null) { if (metrics == null) {
metrics = new MetricRegistry(); metrics = new MetricRegistry();

View file

@ -27,6 +27,7 @@ import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession; import net.helenus.core.HelenusSession;
import net.helenus.core.UnitOfWork; import net.helenus.core.UnitOfWork;
import net.helenus.core.annotation.Cacheable; import net.helenus.core.annotation.Cacheable;
import net.helenus.core.annotation.Transactional;
import net.helenus.core.reflect.Entity; import net.helenus.core.reflect.Entity;
import net.helenus.mapping.annotation.Constraints; import net.helenus.mapping.annotation.Constraints;
import net.helenus.mapping.annotation.Index; import net.helenus.mapping.annotation.Index;
@ -39,6 +40,7 @@ import org.junit.Test;
@Table @Table
@Cacheable @Cacheable
@Transactional
interface Widget extends Entity { interface Widget extends Entity {
@PartitionKey @PartitionKey
UUID id(); UUID id();