From ad9261fda3e2406229f4db76f5f52c43b932abab Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Thu, 9 Nov 2017 12:05:09 -0500 Subject: [PATCH] WIP: started with wal idea, shifting to accounting table. --- .../core/AbstractSessionOperations.java | 3 + .../net/helenus/core/AbstractUnitOfWork.java | 85 ++++++++++++++++--- .../java/net/helenus/core/HelenusSession.java | 6 ++ .../net/helenus/core/SessionInitializer.java | 63 +++++++++++--- .../java/net/helenus/core/WriteAheadLog.java | 44 ++++++++++ .../core/annotation/Transactional.java | 25 ++++++ .../core/operation/InsertOperation.java | 6 +- .../net/helenus/core/operation/Operation.java | 1 + .../core/unitofwork/UnitOfWorkTest.java | 2 + 9 files changed, 209 insertions(+), 26 deletions(-) create mode 100644 src/main/java/net/helenus/core/WriteAheadLog.java create mode 100644 src/main/java/net/helenus/core/annotation/Transactional.java diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index 9846509..a232bed 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -149,4 +149,7 @@ public abstract class AbstractSessionOperations { } public void cacheEvict(List facets) {} + + public boolean obscureValuesInQueries() { return false; } + } diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java index af4cf65..26b816b 100644 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -15,19 +15,12 @@ */ package net.helenus.core; -import static net.helenus.core.HelenusSession.deleted; - -import com.datastax.driver.core.BatchStatement; -import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.utils.UUIDs; import com.diffplug.common.base.Errors; import com.google.common.base.Stopwatch; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; import com.google.common.collect.TreeTraverser; -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; import net.helenus.core.cache.CacheUtil; import net.helenus.core.cache.Facet; import net.helenus.core.operation.AbstractOperation; @@ -36,6 +29,16 @@ import net.helenus.support.Either; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static net.helenus.core.HelenusSession.deleted; +import static net.helenus.core.Query.gte; +import static net.helenus.core.Query.lt; + /** Encapsulates the concept of a "transaction" as a unit-of-work. */ public abstract class AbstractUnitOfWork implements UnitOfWork, AutoCloseable { @@ -45,6 +48,7 @@ public abstract class AbstractUnitOfWork private final List> nested = new ArrayList<>(); private final HelenusSession session; private final AbstractUnitOfWork parent; + private final UUID id; private final Table>> cache = HashBasedTable.create(); protected String purpose; protected List nestedPurposes = new ArrayList(); @@ -60,12 +64,16 @@ public abstract class AbstractUnitOfWork private boolean committed = false; private long committedAt = 0L; private BatchOperation batch; + private WriteAheadLog wal; + private long oldestInFlight; protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork parent) { Objects.requireNonNull(session, "containing session cannot be null"); this.session = session; this.parent = parent; + this.id = UUIDs.random(); + this.wal = Helenus.dsl(WriteAheadLog.class); } @Override @@ -95,7 +103,25 @@ public abstract class AbstractUnitOfWork if (LOG.isInfoEnabled()) { elapsedTime = Stopwatch.createStarted(); } - // log.record(txn::start) + + UUID seq = UUIDs.timeBased(); + try { + session.upsert(WriteAheadLog.class) + .value(wal::lsn, seq) + .value(wal::uow, id) + .value(wal::type, WriteAheadLog.Type.BEGIN) + .usingTimestamp(seq.timestamp()) + .consistencyQuorum() + .sync(); + } catch(TimeoutException e) { + aborted = true; + } + + /* TODO(gburd): find oldest BEGIN record between: time[now] and time[now] - txn timeout (e.g. 5m ago) + that doesn't have a corresponding COMMITTED or ABORTED record. Save that UUID for later. */ + oldestInFlight = seq.timestamp() - (5 * 60 * 1000 * 1000 * 100); + Date d = new Date(oldestInFlight); + return this; } @@ -292,7 +318,6 @@ public abstract class AbstractUnitOfWork if (batch != null) { committedAt = batch.sync(this); - //TODO(gburd) update cache with writeTime... } // All nested UnitOfWork should be committed (not aborted) before calls to @@ -306,10 +331,48 @@ public abstract class AbstractUnitOfWork } } - // log.record(txn::provisionalCommit) + // 1. log.record(txn::provisionalCommit) + UUID seqPrep = UUIDs.timeBased(); + try { + UUID seq = seqPrep; + session.upsert(WriteAheadLog.class) + .value(wal::lsn, seq) + .value(wal::uow, id) + .value(wal::type, WriteAheadLog.Type.PREPARED) + .usingTimestamp(seq.timestamp()) + .consistencyQuorum() + .sync(); + } catch (TimeoutException e) { + canCommit = false; + aborted = true; + } + + // 2. fetch log records between oldest in-flight txn begin and + Stream recs = session.select(WriteAheadLog.class) + .where(wal::lsn, gte(oldestInFlight)) + .and(wal::lsn, lt(seqPrep)) + .consistencyQuorum() + .sync(); + // examine log for conflicts in read-set and write-set between begin and // provisional commit // if (conflict) { throw new ConflictingUnitOfWorkException(this) } + + UUID seqCommit = UUIDs.timeBased(); + try { + UUID seq = seqCommit; + session.upsert(WriteAheadLog.class) + .value(wal::lsn, seq) + .value(wal::uow, id) + .value(wal::type, WriteAheadLog.Type.COMMITTED) + .usingTimestamp(seq.timestamp()) + .consistencyQuorum() + .sync(); + } catch (TimeoutException e) { + canCommit = false; + aborted = true; + } + // else return function so as to enable commit.andThen(() -> { do something iff // commit was successful; }) diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index c3e212f..4e02e06 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -72,6 +72,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab private final RowColumnValueProvider valueProvider; private final StatementColumnValuePreparer valuePreparer; private final Metadata metadata; + private final boolean obscureValuesInQueries; private volatile String usingKeyspace; private volatile boolean showCql; @@ -80,6 +81,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab String usingKeyspace, CodecRegistry registry, boolean showCql, + boolean obscureValues, PrintStream printStream, SessionRepositoryBuilder sessionRepositoryBuilder, Executor executor, @@ -96,6 +98,7 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab Objects.requireNonNull( usingKeyspace, "keyspace needs to be selected before creating session"); this.showCql = showCql; + this.obscureValuesInQueries = obscureValues; this.printStream = printStream; this.sessionRepository = sessionRepositoryBuilder.build(); this.executor = executor; @@ -133,6 +136,9 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab return this; } + @Override + public boolean obscureValuesInQueries() { return obscureValuesInQueries; } + @Override public boolean isShowCql() { return showCql; diff --git a/src/main/java/net/helenus/core/SessionInitializer.java b/src/main/java/net/helenus/core/SessionInitializer.java index c77379d..4976456 100644 --- a/src/main/java/net/helenus/core/SessionInitializer.java +++ b/src/main/java/net/helenus/core/SessionInitializer.java @@ -25,6 +25,8 @@ import java.util.*; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.function.Consumer; + +import net.helenus.core.annotation.Transactional; import net.helenus.core.cache.SessionCache; import net.helenus.core.reflect.DslExportable; import net.helenus.mapping.HelenusEntity; @@ -56,6 +58,7 @@ public final class SessionInitializer extends AbstractSessionOperations { private KeyspaceMetadata keyspaceMetadata; private AutoDdl autoDdl = AutoDdl.UPDATE; private SessionCache sessionCache = null; + private boolean obscureValues = true; SessionInitializer(Session session) { this.session = Objects.requireNonNull(session, "empty session"); @@ -103,6 +106,16 @@ public final class SessionInitializer extends AbstractSessionOperations { return this; } + public SessionInitializer obscureValuesWhenLoggingQueries() { + this.obscureValues = true; + return this; + } + + public SessionInitializer obscureValuesWhenLoggingQueries(boolean obscureValues) { + this.obscureValues = obscureValues; + return this; + } + public SessionInitializer metricRegistry(MetricRegistry metricRegistry) { this.metricRegistry = metricRegistry; return this; @@ -255,6 +268,7 @@ public final class SessionInitializer extends AbstractSessionOperations { usingKeyspace, registry, showCql, + obscureValues, printStream, sessionRepository, executor, @@ -271,19 +285,30 @@ public final class SessionInitializer extends AbstractSessionOperations { Objects.requireNonNull(usingKeyspace, "please define keyspace by 'use' operator"); - initList.forEach( - (either) -> { - Class iface = null; - if (either.isLeft()) { - iface = MappingUtil.getMappingInterface(either.getLeft()); - } else { - iface = either.getRight(); - } + boolean transactionalEntities = false; + HelenusEntity walEntity = null; + + for (Either> either : initList) { + Class iface = null; + if (either.isLeft()) { + iface = MappingUtil.getMappingInterface(either.getLeft()); + } else { + iface = either.getRight(); + } + + DslExportable dsl = (DslExportable) Helenus.dsl(iface); + dsl.setCassandraMetadataForHelenusSession(session.getCluster().getMetadata()); + sessionRepository.add(dsl); + + if (!transactionalEntities && iface.getDeclaredAnnotation(Transactional.class) != null) { + transactionalEntities = true; + dsl = (DslExportable) Helenus.dsl(WriteAheadLog.class); + dsl.setCassandraMetadataForHelenusSession(session.getCluster().getMetadata()); + walEntity = dsl.getHelenusMappingEntity(); + sessionRepository.add(dsl); + } + } - DslExportable dsl = (DslExportable) Helenus.dsl(iface); - dsl.setCassandraMetadataForHelenusSession(session.getCluster().getMetadata()); - sessionRepository.add(dsl); - }); TableOperations tableOps = new TableOperations(this, dropUnusedColumns, dropUnusedIndexes); UserTypeOperations userTypeOps = new UserTypeOperations(this, dropUnusedColumns); @@ -311,6 +336,10 @@ public final class SessionInitializer extends AbstractSessionOperations { eachUserTypeInReverseOrder(userTypeOps, e -> userTypeOps.dropUserType(e)); + if (transactionalEntities) { + tableOps.dropTable(walEntity); + } + // FALLTHRU to CREATE case (read: the absence of a `break;` statement here is // intentional!) case CREATE: @@ -328,6 +357,9 @@ public final class SessionInitializer extends AbstractSessionOperations { .filter(e -> e.getType() == HelenusEntityType.VIEW) .forEach(e -> tableOps.createView(e)); + if (transactionalEntities) { + tableOps.createTable(walEntity); + } break; case VALIDATE: @@ -339,6 +371,9 @@ public final class SessionInitializer extends AbstractSessionOperations { .filter(e -> e.getType() == HelenusEntityType.TABLE) .forEach(e -> tableOps.validateTable(getTableMetadata(e), e)); + if (transactionalEntities) { + tableOps.validateTable(getTableMetadata(walEntity), walEntity); + } break; case UPDATE: @@ -361,6 +396,10 @@ public final class SessionInitializer extends AbstractSessionOperations { .stream() .filter(e -> e.getType() == HelenusEntityType.VIEW) .forEach(e -> tableOps.createView(e)); + + if (transactionalEntities) { + tableOps.updateTable(getTableMetadata(walEntity), walEntity); + } break; } diff --git a/src/main/java/net/helenus/core/WriteAheadLog.java b/src/main/java/net/helenus/core/WriteAheadLog.java new file mode 100644 index 0000000..e427300 --- /dev/null +++ b/src/main/java/net/helenus/core/WriteAheadLog.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.core; + +import com.datastax.driver.core.DataType; +import net.helenus.mapping.annotation.*; + +import java.util.Date; +import java.util.Set; +import java.util.UUID; + +@Table(value="wal") +public interface WriteAheadLog { + + enum Type { BEGIN, PREPARED, COMMITTED, ABORTED } + + @PartitionKey + @Types.Timestamp + Date lsn(); + + @Column(value="id") + UUID uow(); + + @Column + default Type type() { return Type.BEGIN; } + + @Column + @Index + @Types.Set(DataType.Name.TEXT) + Set mutations(); +} diff --git a/src/main/java/net/helenus/core/annotation/Transactional.java b/src/main/java/net/helenus/core/annotation/Transactional.java new file mode 100644 index 0000000..b7c1df6 --- /dev/null +++ b/src/main/java/net/helenus/core/annotation/Transactional.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2015 The Helenus Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.helenus.core.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface Transactional {} diff --git a/src/main/java/net/helenus/core/operation/InsertOperation.java b/src/main/java/net/helenus/core/operation/InsertOperation.java index 4f2d0d2..688625d 100644 --- a/src/main/java/net/helenus/core/operation/InsertOperation.java +++ b/src/main/java/net/helenus/core/operation/InsertOperation.java @@ -144,12 +144,12 @@ public final class InsertOperation extends AbstractOperation entities = values.stream().map(t -> t._1.getProperty().getEntity()).distinct().collect(Collectors.toList()); + List> entities = values.stream().map(t -> t._1.getProperty().getEntity().getMappingInterface()).distinct().collect(Collectors.toList()); if (entities.size() != 1) { throw new HelenusMappingException("you can insert only single entity at a time, found: " - + entities.stream().map(e -> e.getMappingInterface().toString()).collect(Collectors.joining(", "))); + + entities.stream().map(e -> e.toString()).collect(Collectors.joining(", "))); } - HelenusEntity entity = entities.get(0); + HelenusEntity entity = values.get(0)._1.getEntity(); if (this.entity != null) { if (this.entity != entity) { throw new HelenusMappingException("you can insert only single entity at a time, found: " + diff --git a/src/main/java/net/helenus/core/operation/Operation.java b/src/main/java/net/helenus/core/operation/Operation.java index 1f115e7..73018a6 100644 --- a/src/main/java/net/helenus/core/operation/Operation.java +++ b/src/main/java/net/helenus/core/operation/Operation.java @@ -56,6 +56,7 @@ public abstract class Operation { Operation(AbstractSessionOperations sessionOperations) { this.sessionOps = sessionOperations; + this.showValues = !sessionOps.obscureValuesInQueries(); MetricRegistry metrics = sessionOperations.getMetricRegistry(); if (metrics == null) { metrics = new MetricRegistry(); diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java index 3615bac..62b580e 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java @@ -27,6 +27,7 @@ import net.helenus.core.Helenus; import net.helenus.core.HelenusSession; import net.helenus.core.UnitOfWork; import net.helenus.core.annotation.Cacheable; +import net.helenus.core.annotation.Transactional; import net.helenus.core.reflect.Entity; import net.helenus.mapping.annotation.Constraints; import net.helenus.mapping.annotation.Index; @@ -39,6 +40,7 @@ import org.junit.Test; @Table @Cacheable +@Transactional interface Widget extends Entity { @PartitionKey UUID id();