Fix a few things.
This commit is contained in:
parent
c025dc35a7
commit
39a8643103
6 changed files with 32 additions and 33 deletions
|
@ -245,18 +245,19 @@ public abstract class AbstractUnitOfWork<E extends Exception>
|
||||||
Either<Object, List<Facet>> deletedObjectFacets = Either.right(facets);
|
Either<Object, List<Facet>> deletedObjectFacets = Either.right(facets);
|
||||||
String tableName = CacheUtil.schemaName(facets);
|
String tableName = CacheUtil.schemaName(facets);
|
||||||
Optional<Object> optionalValue = cacheLookup(facets);
|
Optional<Object> optionalValue = cacheLookup(facets);
|
||||||
|
|
||||||
|
for (Facet facet : facets) {
|
||||||
|
if (!facet.fixed()) {
|
||||||
|
String columnKey = facet.name() + "==" + facet.value();
|
||||||
|
// mark the value identified by the facet to `deleted`
|
||||||
|
cache.put(tableName, columnKey, deletedObjectFacets);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now, look for other row/col pairs that referenced the same object, mark them
|
||||||
|
// `deleted` if the cache had a value before we added the deleted marker objects.
|
||||||
if (optionalValue.isPresent()) {
|
if (optionalValue.isPresent()) {
|
||||||
Object value = optionalValue.get();
|
Object value = optionalValue.get();
|
||||||
|
|
||||||
for (Facet facet : facets) {
|
|
||||||
if (!facet.fixed()) {
|
|
||||||
String columnKey = facet.name() + "==" + facet.value();
|
|
||||||
// mark the value identified by the facet to `deleted`
|
|
||||||
cache.put(tableName, columnKey, deletedObjectFacets);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// look for other row/col pairs that referenced the same object, mark them
|
|
||||||
// `deleted`
|
|
||||||
cache
|
cache
|
||||||
.columnKeySet()
|
.columnKeySet()
|
||||||
.forEach(
|
.forEach(
|
||||||
|
@ -326,13 +327,6 @@ public abstract class AbstractUnitOfWork<E extends Exception>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// log.record(txn::provisionalCommit)
|
|
||||||
// examine log for conflicts in read-set and write-set between begin and
|
|
||||||
// provisional commit
|
|
||||||
// if (conflict) { throw new ConflictingUnitOfWorkException(this) }
|
|
||||||
// else return function so as to enable commit.andThen(() -> { do something iff
|
|
||||||
// commit was successful; })
|
|
||||||
|
|
||||||
if (canCommit) {
|
if (canCommit) {
|
||||||
committed = true;
|
committed = true;
|
||||||
aborted = false;
|
aborted = false;
|
||||||
|
|
|
@ -151,8 +151,8 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
||||||
if (isSessionCacheable()) {
|
if (isSessionCacheable()) {
|
||||||
String tableName = CacheUtil.schemaName(facets);
|
String tableName = CacheUtil.schemaName(facets);
|
||||||
cachedResult = (E) sessionOps.checkCache(tableName, facets);
|
cachedResult = (E) sessionOps.checkCache(tableName, facets);
|
||||||
Class<?> iface = MappingUtil.getMappingInterface(cachedResult);
|
|
||||||
if (cachedResult != null) {
|
if (cachedResult != null) {
|
||||||
|
Class<?> iface = MappingUtil.getMappingInterface(cachedResult);
|
||||||
try {
|
try {
|
||||||
if (Drafted.class.isAssignableFrom(iface)) {
|
if (Drafted.class.isAssignableFrom(iface)) {
|
||||||
result = Optional.of(cachedResult);
|
result = Optional.of(cachedResult);
|
||||||
|
|
|
@ -157,8 +157,8 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
||||||
if (isSessionCacheable()) {
|
if (isSessionCacheable()) {
|
||||||
String tableName = CacheUtil.schemaName(facets);
|
String tableName = CacheUtil.schemaName(facets);
|
||||||
cachedResult = (E) sessionOps.checkCache(tableName, facets);
|
cachedResult = (E) sessionOps.checkCache(tableName, facets);
|
||||||
Class<?> iface = MappingUtil.getMappingInterface(cachedResult);
|
|
||||||
if (cachedResult != null) {
|
if (cachedResult != null) {
|
||||||
|
Class<?> iface = MappingUtil.getMappingInterface(cachedResult);
|
||||||
E result = null;
|
E result = null;
|
||||||
try {
|
try {
|
||||||
if (Drafted.class.isAssignableFrom(iface)) {
|
if (Drafted.class.isAssignableFrom(iface)) {
|
||||||
|
|
|
@ -38,7 +38,6 @@ import net.helenus.support.HelenusException;
|
||||||
import net.helenus.support.HelenusMappingException;
|
import net.helenus.support.HelenusMappingException;
|
||||||
import net.helenus.support.Immutables;
|
import net.helenus.support.Immutables;
|
||||||
|
|
||||||
|
|
||||||
public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateOperation<E>> {
|
public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateOperation<E>> {
|
||||||
|
|
||||||
private final Map<Assignment, BoundFacet> assignments = new HashMap<>();
|
private final Map<Assignment, BoundFacet> assignments = new HashMap<>();
|
||||||
|
@ -787,13 +786,14 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
|
||||||
@Override
|
@Override
|
||||||
public E sync() throws TimeoutException {
|
public E sync() throws TimeoutException {
|
||||||
E result = super.sync();
|
E result = super.sync();
|
||||||
if (entity.isCacheable()) {
|
if (result != null && entity.isCacheable()) {
|
||||||
if (draft != null) {
|
if (draft != null) {
|
||||||
sessionOps.updateCache(draft, bindFacetValues());
|
|
||||||
adjustTtlAndWriteTime(draft);
|
adjustTtlAndWriteTime(draft);
|
||||||
|
adjustTtlAndWriteTime((MapExportable) result);
|
||||||
|
sessionOps.updateCache(result, bindFacetValues());
|
||||||
} else if (pojo != null) {
|
} else if (pojo != null) {
|
||||||
sessionOps.updateCache(pojo, bindFacetValues());
|
|
||||||
adjustTtlAndWriteTime((MapExportable) pojo);
|
adjustTtlAndWriteTime((MapExportable) pojo);
|
||||||
|
sessionOps.updateCache(pojo, bindFacetValues());
|
||||||
} else {
|
} else {
|
||||||
sessionOps.cacheEvict(bindFacetValues());
|
sessionOps.cacheEvict(bindFacetValues());
|
||||||
}
|
}
|
||||||
|
@ -807,16 +807,18 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
|
||||||
return sync();
|
return sync();
|
||||||
}
|
}
|
||||||
E result = super.sync(uow);
|
E result = super.sync(uow);
|
||||||
if (draft != null) {
|
if (result != null) {
|
||||||
adjustTtlAndWriteTime(draft);
|
if (draft != null) {
|
||||||
|
adjustTtlAndWriteTime(draft);
|
||||||
|
}
|
||||||
if (entity != null && MapExportable.class.isAssignableFrom(entity.getMappingInterface())) {
|
if (entity != null && MapExportable.class.isAssignableFrom(entity.getMappingInterface())) {
|
||||||
adjustTtlAndWriteTime((MapExportable) result);
|
adjustTtlAndWriteTime((MapExportable) result);
|
||||||
|
cacheUpdate(uow, result, bindFacetValues());
|
||||||
|
} else if (pojo != null) {
|
||||||
|
adjustTtlAndWriteTime((MapExportable) pojo);
|
||||||
|
cacheUpdate(uow, (E) pojo, bindFacetValues());
|
||||||
|
return (E) pojo;
|
||||||
}
|
}
|
||||||
cacheUpdate(uow, result, bindFacetValues());
|
|
||||||
} else if (pojo != null) {
|
|
||||||
cacheUpdate(uow, (E) pojo, bindFacetValues());
|
|
||||||
adjustTtlAndWriteTime((MapExportable) pojo);
|
|
||||||
return (E) pojo;
|
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,6 @@ public class EntityDraftBuilderTest extends AbstractEmbeddedCassandraTest {
|
||||||
Supply s2 =
|
Supply s2 =
|
||||||
session
|
session
|
||||||
.<Supply>update(s1.update())
|
.<Supply>update(s1.update())
|
||||||
.and(supply::region, eq(region))
|
|
||||||
.prepend(supply::suppliers, "Pignose Supply, LLC.")
|
.prepend(supply::suppliers, "Pignose Supply, LLC.")
|
||||||
.sync();
|
.sync();
|
||||||
|
|
||||||
|
|
|
@ -264,8 +264,13 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
||||||
Assert.assertEquals(w1, w2);
|
Assert.assertEquals(w1, w2);
|
||||||
|
|
||||||
// This should remove the object from the session cache.
|
// This should remove the object from the session cache.
|
||||||
|
session.<Widget>update(w2).set(widget::name, "Bill").where(widget::id, eq(key)).sync(uow);
|
||||||
w3 =
|
w3 =
|
||||||
session.<Widget>update(w2).set(widget::name, "Bill").where(widget::id, eq(key)).sync(uow);
|
session
|
||||||
|
.<Widget>update(w2)
|
||||||
|
.set(widget::name, w1.name())
|
||||||
|
.where(widget::id, eq(key))
|
||||||
|
.sync(uow);
|
||||||
|
|
||||||
// Fetch from session cache will cache miss (as it was updated) and trigger a SELECT.
|
// Fetch from session cache will cache miss (as it was updated) and trigger a SELECT.
|
||||||
w4 = session.<Widget>select(widget).where(widget::id, eq(key)).single().sync().orElse(null);
|
w4 = session.<Widget>select(widget).where(widget::id, eq(key)).single().sync().orElse(null);
|
||||||
|
@ -284,7 +289,6 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
|
||||||
|
|
||||||
Assert.assertTrue(w5.equals(w2));
|
Assert.assertTrue(w5.equals(w2));
|
||||||
Assert.assertTrue(w2.equals(w5));
|
Assert.assertTrue(w2.equals(w5));
|
||||||
Assert.assertEquals(w5.name(), "Bill");
|
|
||||||
|
|
||||||
uow.commit()
|
uow.commit()
|
||||||
.andThen(
|
.andThen(
|
||||||
|
|
Loading…
Reference in a new issue