Compare commits

...

61 commits

Author SHA1 Message Date
Greg Burd
9db3500e3a Repair the copyright per Casser author's request. Also fix DOS line endings. 2018-11-10 08:50:02 -05:00
Greg Burd
d179539a31 Only unwrap cache implementation for clients who are expecting the Map interface. 2018-04-23 12:37:49 -04:00
Greg Burd
06fe21d08e Update visibility to allow subclasses to change log message. 2018-04-19 10:44:39 -04:00
Greg Burd
099fd999ee Don't stop the stopwatch if it's not running. 2018-04-04 14:34:59 -04:00
Greg Burd
662a697d03 Changes to andThen/orElse and adding exceptionally logic. On commit one or the other of andThen or orElse will fire based on the path (commit, abort). If during any of those an exception is thrown the exceptionally is called with the exception. 2018-04-02 11:45:49 -04:00
b449817659
Merge pull request #2 from onshape/jgdef_delete_and_fixes
Delegate-based delete-tracking cache implementation, fixes to javax.cache.Cache implementation, and unit tests
2018-04-01 08:59:41 -04:00
John de Freitas
e6051b12e9 Delegate-based delete-tracking cache implementation, fixes to javax.cache.Cache implementation, and unit tests 2018-03-31 22:24:12 -04:00
John de Freitas
9f511cde74 Revert "Not-yet-tested tracking of UOW deletes"
This reverts commit b27bc7d9a9.
2018-03-31 22:21:28 -04:00
John de Freitas
4b9187ebe5 Revert "Delegate-based evict-tracking cache, fixes to core javax.cache.Cache implemenation MapCache, and unit tests"
This reverts commit 93a81e7fd0.
2018-03-31 22:21:18 -04:00
John de Freitas
93a81e7fd0 Delegate-based evict-tracking cache, fixes to core javax.cache.Cache implemenation MapCache, and unit tests 2018-03-31 22:18:28 -04:00
John de Freitas
3169d0c100 Not-yet-tested tracking of UOW deletes 2018-03-31 22:17:00 -04:00
John de Freitas
b27bc7d9a9 Not-yet-tested tracking of UOW deletes 2018-03-30 11:12:00 -04:00
Greg Burd
1f4c2154e2 Remove purpse guessing logic, it's overhead and buggy. 2018-03-29 11:57:43 -04:00
Greg Burd
6788cea1a0 Fix array out of bound bug in UnitOfWork purpose discovery logic. 2018-03-29 11:16:57 -04:00
Greg Burd
9d94e865b6 Allow subclasses access to the session. 2018-03-28 09:15:15 -04:00
Greg Burd
654f4434bf Remove Zipkin. Revert abstracted UnitOfWork into single class. 2018-03-28 08:27:49 -04:00
Greg Burd
6c245c121e Avoid NPE when batch passed in was null. 2018-03-05 08:37:48 -05:00
Greg Burd
ef455ac032 Find table name after binding facets. Revert duplicate null check. 2018-03-01 06:38:47 -07:00
Greg Burd
b1e333009c Don't merge in keys with null values, JCache doesn't support null values. 2018-03-01 05:23:43 -07:00
Greg Burd
ca6afc326c Qualify index names with their table name. 2018-02-28 06:48:30 -05:00
Greg Burd
af4156079d Formatting and fixes to use MapCache in the UOW. 2018-02-10 12:32:54 -05:00
Greg Burd
b023ec359b Moving toward javax.cache.Cache-based UOW cache API. 2018-02-09 21:55:23 -05:00
Greg Burd
76b603f3d3 Move the Guava JCache provider into the test targets only, don't assume a CacheManager instance exists. 2018-02-08 14:46:31 -05:00
Greg Burd
d69d8a3b1e Finish first steps of JCache integration, UnitOfWork statement cache now merges into available JCache at commit. 2018-02-08 10:09:23 -05:00
Greg Burd
6858cf6f48 Integrate JCache for cached objects outside of a UnitOfWork. 2018-02-07 18:41:39 -05:00
5215749de1
Merge pull request #1 from onshape/pbeaman/remove-jamm-0.2.5-dependency
Remove test-scope dependency on jamm-0.2.5 because it conflicts with …
2018-01-27 06:41:46 -05:00
pbeaman
2299939be3 Remove test-scope dependency on jamm-0.2.5 because it conflicts with a later version and causes NoSuchMethodError when running unit tests in Eclipse 2018-01-26 10:42:13 -05:00
Greg Burd
0ddacec354 Removing Gradle files until switch is complete. 2018-01-25 16:45:13 -05:00
Greg Burd
287e1a5b8b Use DataStax/Cassandra timestamp generator. 2018-01-25 14:21:21 -05:00
Greg Burd
8b9d582fa5 Mutation only needs to be different and can occur even when current is null. 2018-01-25 12:57:37 -05:00
Greg Burd
96a8476fd8 Null values have no keys in the map, so this test fails. In most casses we're here after we resolve a getter so we know this key is valid. Skip the check. 2018-01-25 12:41:13 -05:00
Greg Burd
f168b33f6a Formatting. 2018-01-25 11:30:41 -05:00
Greg Burd
11de7015c2 Change mutate on drafts to be a bit faster. 2018-01-25 11:26:12 -05:00
Greg Burd
f9b1563bdd Ensure there is a batch. 2018-01-24 16:11:44 -05:00
Greg Burd
e2f45f82c9 Fix logged batch syntax and timestamp logic. 2018-01-24 14:33:37 -05:00
Greg Burd
27dd9a4eff Include the post commit/abort function execution time that happens within/associated-with the scope of a UOW. Fix reset method to check both maps for updates. 2018-01-24 09:28:19 -05:00
Greg Burd
26f41dab75 Add notion of statement cache to UnitOfWork. Ignore call to useKeyspace when session isn't valid. 2018-01-17 12:38:33 -05:00
Greg Burd
1da822ce57 Fix build. Change scripts to use /usr/bin/env so that they work on NiXOS. Revert abort() logic. 2018-01-16 13:10:12 -05:00
Greg Burd
1ef50ae179 Allow an init path that doesn't require a non-null Cassandra/DataStax session/cluster context. 2018-01-16 11:59:51 -05:00
Greg Burd
26c67e391a Review related fixes. 2018-01-15 11:28:04 -05:00
Greg Burd
3554b7ecb5 Add isDone() method on UnitOfWork. 2018-01-14 12:51:49 -05:00
Greg Burd
7b4e46431f Update DataStax driver version. 2017-12-14 10:19:38 -05:00
Greg Burd
60b040e7a9 Always start the timer. 2017-12-11 16:04:28 -05:00
Greg Burd
7a470bd5d7 Formatting. 2017-11-15 22:39:50 -05:00
Greg Burd
0827291253 Spoil futures not completed before an abort/commit of the UOW they belong too. Track read set for Entity/Drafted model objects. 2017-11-15 13:56:03 -05:00
Greg Burd
50f656bc8a Fix commit.andThen() logic. 2017-11-15 09:16:15 -05:00
Greg Burd
9df97b3e44 WIP: commit.exceptionally() is working but somehow in the process I broke commit.andThen(). 2017-11-14 22:37:37 -05:00
Greg Burd
33b4b35912 Formatting. 2017-11-14 15:42:16 -05:00
Greg Burd
1eccb631f3 Fix logic that was failing to cache results on cache miss. 2017-11-14 15:26:16 -05:00
Greg Burd
e932d0dcf2 Check to ensure value not null. 2017-11-14 10:06:13 -05:00
Greg Burd
618a7ea380 Draft instances map is mutable and so are collection values inside the map, this makes the UPDATE logic straight forward when mutating in-cache draft objects. Also, fix one or two logic bugs with isAssignableFrom(). 2017-11-14 09:55:03 -05:00
Greg Burd
7a56059036 Fix misuse of Drafted interface in tests. WIP fixing use of immutable collections in UPDATE/draft logic. 2017-11-13 15:55:24 -05:00
Greg Burd
33d2459538 Sometimes there are no filters. 2017-11-13 11:01:30 -05:00
Greg Burd
a63a1be4b6 Paranoia. 2017-11-13 10:47:48 -05:00
Greg Burd
d30361538c Operations now default to non-idempotent unless explictly set in the statement or if they contain fields that are idempotent (e.g. @Column(idempotent=true) or part of the primary key for the row). 2017-11-13 10:36:16 -05:00
Greg Burd
a993af6c29 Enable toggle for showing values in logged CQL statements. Default to true. 2017-11-12 22:32:58 -05:00
Greg Burd
39a8643103 Fix a few things. 2017-11-12 21:37:59 -05:00
Greg Burd
c025dc35a7 Formatting. 2017-11-12 20:14:31 -05:00
Greg Burd
d19a9c741d Fixing a few corners of caching when using drafted entity objects. Working out kinks in merge logic for entity instances in UOWs. 2017-11-10 22:48:40 -05:00
Greg Burd
6ff188f241 Move statement logging into Operation, cover special case for batches. Cleanup UOW commit logging a bit. 2017-11-09 15:03:30 -05:00
Greg Burd
2f0801d36f Fix test to see if select is of Fun<?> type. 2017-11-09 13:32:16 -05:00
294 changed files with 4291 additions and 3289 deletions

8
NOTES
View file

@ -22,6 +22,14 @@ Operation/
`-- PreparedStreamOperation
----
@CompoundIndex()
create a new col in the same table called __idx_a_b_c that the hash of the concatenated values in that order is stored, create a normal index for that (CREATE INDEX ...)
if a query matches that set of columns then use that indexed col to fetch the desired results from that table
could also work with .in() query if materialized view exists
----
// TODO(gburd): create a statement that matches one that wasn't prepared
//String key =
// "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString();

View file

@ -1,3 +1,3 @@
#!/bin/bash
#!/usr/bin/env bash
mvn clean jar:jar javadoc:jar source:jar deploy -Prelease
mvn clean jar:jar javadoc:jar source:jar deploy -Prelease

View file

@ -1,7 +1,14 @@
#!/bin/bash
#!/usr/bin/env bash
for f in $(find ./src -name \*.java); do
echo Formatting $f
java -jar ./lib/google-java-format-1.3-all-deps.jar --replace $f
done
if [ "X$1" == "Xall" ]; then
for f in $(find ./src -name \*.java); do
echo Formatting $f
java -jar ./lib/google-java-format-1.3-all-deps.jar --replace $f
done
else
for file in $(git status --short | awk '{print $2}'); do
echo $file
java -jar ./lib/google-java-format-1.3-all-deps.jar --replace $file
done
fi

View file

@ -1,3 +1,3 @@
#!/bin/bash
#!/usr/bin/env bash
mvn clean jar:jar javadoc:jar source:jar install -Prelease

View file

@ -1,90 +0,0 @@
// gradle wrapper
// ./gradlew clean generateLock saveLock
// ./gradlew compileJava
// ./gradlew run
// ./gradlew run --debug-jvm
// ./gradlew publishToMavenLocal
buildscript {
ext {}
repositories {
jcenter()
mavenLocal()
mavenCentral()
maven { url "https://clojars.org/repo" }
maven { url "https://plugins.gradle.org/m2/" }
}
dependencies {
classpath 'com.netflix.nebula:gradle-dependency-lock-plugin:4.+'
classpath 'com.uber:okbuck:0.19.0'
}
}
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
apply plugin: 'java-library'
apply plugin: 'maven-publish'
apply plugin: 'com.uber.okbuck'
apply plugin: 'nebula.dependency-lock'
task wrapper(type: Wrapper) {
gradleVersion = '4.0.2'
}
jar {
baseName = 'helenus'
group = 'net.helenus'
version = '2.0.17-SNAPSHOT'
}
description = """helenus"""
sourceCompatibility = 1.8
targetCompatibility = 1.8
tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
}
configurations.all {
}
repositories {
jcenter()
mavenLocal()
mavenCentral()
maven { url "file:///Users/gburd/ws/helenus/lib" }
maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
maven { url "http://repo.maven.apache.org/maven2" }
}
dependencies {
compile group: 'com.datastax.cassandra', name: 'cassandra-driver-core', version: '3.3.0'
compile group: 'org.aspectj', name: 'aspectjrt', version: '1.8.10'
compile group: 'org.aspectj', name: 'aspectjweaver', version: '1.8.10'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.6'
compile group: 'org.springframework', name: 'spring-core', version: '4.3.10.RELEASE'
compile group: 'com.google.guava', name: 'guava', version: '20.0'
compile group: 'com.diffplug.durian', name: 'durian', version: '3.+'
compile group: 'io.zipkin.java', name: 'zipkin', version: '1.29.2'
compile group: 'io.zipkin.brave', name: 'brave', version: '4.0.6'
compile group: 'io.dropwizard.metrics', name: 'metrics-core', version: '3.2.2'
compile group: 'javax.validation', name: 'validation-api', version: '2.0.0.CR3'
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.1'
runtime group: 'org.slf4j', name: 'jcl-over-slf4j', version: '1.7.1'
testCompile group: 'org.codehaus.jackson', name: 'jackson-mapper-asl', version: '1.9.13'
testCompile group: 'com.anthemengineering.mojo', name: 'infer-maven-plugin', version: '0.1.0'
testCompile group: 'org.codehaus.jackson', name: 'jackson-core-asl', version: '1.9.13'
testCompile(group: 'org.cassandraunit', name: 'cassandra-unit', version: '3.1.4.0-SNAPSHOT') {
exclude(module: 'cassandra-driver-core')
}
testCompile group: 'org.apache.cassandra', name: 'cassandra-all', version: '3.11.0'
testCompile group: 'commons-io', name: 'commons-io', version: '2.5'
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'com.github.stephenc', name: 'jamm', version: '0.2.5'
testCompile group: 'org.hamcrest', name: 'hamcrest-library', version: '1.3'
testCompile group: 'org.hamcrest', name: 'hamcrest-core', version: '1.3'
testCompile group: 'org.mockito', name: 'mockito-core', version: '2.8.47'
}

View file

@ -1,648 +0,0 @@
{
"compile": {
"com.datastax.cassandra:cassandra-driver-core": {
"locked": "3.3.0",
"requested": "3.3.0"
},
"com.diffplug.durian:durian": {
"locked": "3.5.0-SNAPSHOT",
"requested": "3.+"
},
"com.google.guava:guava": {
"locked": "20.0",
"requested": "20.0"
},
"io.dropwizard.metrics:metrics-core": {
"locked": "3.2.2",
"requested": "3.2.2"
},
"io.zipkin.brave:brave": {
"locked": "4.0.6",
"requested": "4.0.6"
},
"io.zipkin.java:zipkin": {
"locked": "1.29.2",
"requested": "1.29.2"
},
"javax.validation:validation-api": {
"locked": "2.0.0.CR3",
"requested": "2.0.0.CR3"
},
"org.apache.commons:commons-lang3": {
"locked": "3.6",
"requested": "3.6"
},
"org.aspectj:aspectjrt": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.aspectj:aspectjweaver": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.slf4j:slf4j-api": {
"locked": "1.7.25",
"requested": "1.7.1"
},
"org.springframework:spring-core": {
"locked": "4.3.10.RELEASE",
"requested": "4.3.10.RELEASE"
}
},
"compileClasspath": {
"com.datastax.cassandra:cassandra-driver-core": {
"locked": "3.3.0",
"requested": "3.3.0"
},
"com.diffplug.durian:durian": {
"locked": "3.5.0-SNAPSHOT",
"requested": "3.+"
},
"com.google.guava:guava": {
"locked": "20.0",
"requested": "20.0"
},
"io.dropwizard.metrics:metrics-core": {
"locked": "3.2.2",
"requested": "3.2.2"
},
"io.zipkin.brave:brave": {
"locked": "4.0.6",
"requested": "4.0.6"
},
"io.zipkin.java:zipkin": {
"locked": "1.29.2",
"requested": "1.29.2"
},
"javax.validation:validation-api": {
"locked": "2.0.0.CR3",
"requested": "2.0.0.CR3"
},
"org.apache.commons:commons-lang3": {
"locked": "3.6",
"requested": "3.6"
},
"org.aspectj:aspectjrt": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.aspectj:aspectjweaver": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.slf4j:slf4j-api": {
"locked": "1.7.25",
"requested": "1.7.1"
},
"org.springframework:spring-core": {
"locked": "4.3.10.RELEASE",
"requested": "4.3.10.RELEASE"
}
},
"default": {
"com.datastax.cassandra:cassandra-driver-core": {
"locked": "3.3.0",
"requested": "3.3.0"
},
"com.diffplug.durian:durian": {
"locked": "3.5.0-SNAPSHOT",
"requested": "3.+"
},
"com.google.guava:guava": {
"locked": "20.0",
"requested": "20.0"
},
"io.dropwizard.metrics:metrics-core": {
"locked": "3.2.2",
"requested": "3.2.2"
},
"io.zipkin.brave:brave": {
"locked": "4.0.6",
"requested": "4.0.6"
},
"io.zipkin.java:zipkin": {
"locked": "1.29.2",
"requested": "1.29.2"
},
"javax.validation:validation-api": {
"locked": "2.0.0.CR3",
"requested": "2.0.0.CR3"
},
"org.apache.commons:commons-lang3": {
"locked": "3.6",
"requested": "3.6"
},
"org.aspectj:aspectjrt": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.aspectj:aspectjweaver": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.slf4j:jcl-over-slf4j": {
"locked": "1.7.1",
"requested": "1.7.1"
},
"org.slf4j:slf4j-api": {
"locked": "1.7.25",
"requested": "1.7.1"
},
"org.springframework:spring-core": {
"locked": "4.3.10.RELEASE",
"requested": "4.3.10.RELEASE"
}
},
"runtime": {
"com.datastax.cassandra:cassandra-driver-core": {
"locked": "3.3.0",
"requested": "3.3.0"
},
"com.diffplug.durian:durian": {
"locked": "3.5.0-SNAPSHOT",
"requested": "3.+"
},
"com.google.guava:guava": {
"locked": "20.0",
"requested": "20.0"
},
"io.dropwizard.metrics:metrics-core": {
"locked": "3.2.2",
"requested": "3.2.2"
},
"io.zipkin.brave:brave": {
"locked": "4.0.6",
"requested": "4.0.6"
},
"io.zipkin.java:zipkin": {
"locked": "1.29.2",
"requested": "1.29.2"
},
"javax.validation:validation-api": {
"locked": "2.0.0.CR3",
"requested": "2.0.0.CR3"
},
"org.apache.commons:commons-lang3": {
"locked": "3.6",
"requested": "3.6"
},
"org.aspectj:aspectjrt": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.aspectj:aspectjweaver": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.slf4j:jcl-over-slf4j": {
"locked": "1.7.1",
"requested": "1.7.1"
},
"org.slf4j:slf4j-api": {
"locked": "1.7.25",
"requested": "1.7.1"
},
"org.springframework:spring-core": {
"locked": "4.3.10.RELEASE",
"requested": "4.3.10.RELEASE"
}
},
"runtimeClasspath": {
"com.datastax.cassandra:cassandra-driver-core": {
"locked": "3.3.0",
"requested": "3.3.0"
},
"com.diffplug.durian:durian": {
"locked": "3.5.0-SNAPSHOT",
"requested": "3.+"
},
"com.google.guava:guava": {
"locked": "20.0",
"requested": "20.0"
},
"io.dropwizard.metrics:metrics-core": {
"locked": "3.2.2",
"requested": "3.2.2"
},
"io.zipkin.brave:brave": {
"locked": "4.0.6",
"requested": "4.0.6"
},
"io.zipkin.java:zipkin": {
"locked": "1.29.2",
"requested": "1.29.2"
},
"javax.validation:validation-api": {
"locked": "2.0.0.CR3",
"requested": "2.0.0.CR3"
},
"org.apache.commons:commons-lang3": {
"locked": "3.6",
"requested": "3.6"
},
"org.aspectj:aspectjrt": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.aspectj:aspectjweaver": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.slf4j:jcl-over-slf4j": {
"locked": "1.7.1",
"requested": "1.7.1"
},
"org.slf4j:slf4j-api": {
"locked": "1.7.25",
"requested": "1.7.1"
},
"org.springframework:spring-core": {
"locked": "4.3.10.RELEASE",
"requested": "4.3.10.RELEASE"
}
},
"testCompile": {
"com.anthemengineering.mojo:infer-maven-plugin": {
"locked": "0.1.0",
"requested": "0.1.0"
},
"com.datastax.cassandra:cassandra-driver-core": {
"locked": "3.3.0",
"requested": "3.3.0"
},
"com.diffplug.durian:durian": {
"locked": "3.5.0-SNAPSHOT",
"requested": "3.+"
},
"com.github.stephenc:jamm": {
"locked": "0.2.5",
"requested": "0.2.5"
},
"com.google.guava:guava": {
"locked": "21.0",
"requested": "20.0"
},
"commons-io:commons-io": {
"locked": "2.5",
"requested": "2.5"
},
"io.dropwizard.metrics:metrics-core": {
"locked": "3.2.2",
"requested": "3.2.2"
},
"io.zipkin.brave:brave": {
"locked": "4.0.6",
"requested": "4.0.6"
},
"io.zipkin.java:zipkin": {
"locked": "1.29.2",
"requested": "1.29.2"
},
"javax.validation:validation-api": {
"locked": "2.0.0.CR3",
"requested": "2.0.0.CR3"
},
"junit:junit": {
"locked": "4.12",
"requested": "4.12"
},
"org.apache.cassandra:cassandra-all": {
"locked": "3.11.0",
"requested": "3.11.0"
},
"org.apache.commons:commons-lang3": {
"locked": "3.6",
"requested": "3.6"
},
"org.aspectj:aspectjrt": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.aspectj:aspectjweaver": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.cassandraunit:cassandra-unit": {
"locked": "3.1.4.0-SNAPSHOT",
"requested": "3.1.4.0-SNAPSHOT"
},
"org.codehaus.jackson:jackson-core-asl": {
"locked": "1.9.13",
"requested": "1.9.13"
},
"org.codehaus.jackson:jackson-mapper-asl": {
"locked": "1.9.13",
"requested": "1.9.13"
},
"org.hamcrest:hamcrest-core": {
"locked": "1.3",
"requested": "1.3"
},
"org.hamcrest:hamcrest-library": {
"locked": "1.3",
"requested": "1.3"
},
"org.mockito:mockito-core": {
"locked": "2.8.47",
"requested": "2.8.47"
},
"org.slf4j:slf4j-api": {
"locked": "1.7.25",
"requested": "1.7.1"
},
"org.springframework:spring-core": {
"locked": "4.3.10.RELEASE",
"requested": "4.3.10.RELEASE"
}
},
"testCompileClasspath": {
"com.anthemengineering.mojo:infer-maven-plugin": {
"locked": "0.1.0",
"requested": "0.1.0"
},
"com.datastax.cassandra:cassandra-driver-core": {
"locked": "3.3.0",
"requested": "3.3.0"
},
"com.diffplug.durian:durian": {
"locked": "3.5.0-SNAPSHOT",
"requested": "3.+"
},
"com.github.stephenc:jamm": {
"locked": "0.2.5",
"requested": "0.2.5"
},
"com.google.guava:guava": {
"locked": "21.0",
"requested": "20.0"
},
"commons-io:commons-io": {
"locked": "2.5",
"requested": "2.5"
},
"io.dropwizard.metrics:metrics-core": {
"locked": "3.2.2",
"requested": "3.2.2"
},
"io.zipkin.brave:brave": {
"locked": "4.0.6",
"requested": "4.0.6"
},
"io.zipkin.java:zipkin": {
"locked": "1.29.2",
"requested": "1.29.2"
},
"javax.validation:validation-api": {
"locked": "2.0.0.CR3",
"requested": "2.0.0.CR3"
},
"junit:junit": {
"locked": "4.12",
"requested": "4.12"
},
"org.apache.cassandra:cassandra-all": {
"locked": "3.11.0",
"requested": "3.11.0"
},
"org.apache.commons:commons-lang3": {
"locked": "3.6",
"requested": "3.6"
},
"org.aspectj:aspectjrt": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.aspectj:aspectjweaver": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.cassandraunit:cassandra-unit": {
"locked": "3.1.4.0-SNAPSHOT",
"requested": "3.1.4.0-SNAPSHOT"
},
"org.codehaus.jackson:jackson-core-asl": {
"locked": "1.9.13",
"requested": "1.9.13"
},
"org.codehaus.jackson:jackson-mapper-asl": {
"locked": "1.9.13",
"requested": "1.9.13"
},
"org.hamcrest:hamcrest-core": {
"locked": "1.3",
"requested": "1.3"
},
"org.hamcrest:hamcrest-library": {
"locked": "1.3",
"requested": "1.3"
},
"org.mockito:mockito-core": {
"locked": "2.8.47",
"requested": "2.8.47"
},
"org.slf4j:slf4j-api": {
"locked": "1.7.25",
"requested": "1.7.1"
},
"org.springframework:spring-core": {
"locked": "4.3.10.RELEASE",
"requested": "4.3.10.RELEASE"
}
},
"testRuntime": {
"com.anthemengineering.mojo:infer-maven-plugin": {
"locked": "0.1.0",
"requested": "0.1.0"
},
"com.datastax.cassandra:cassandra-driver-core": {
"locked": "3.3.0",
"requested": "3.3.0"
},
"com.diffplug.durian:durian": {
"locked": "3.5.0-SNAPSHOT",
"requested": "3.+"
},
"com.github.stephenc:jamm": {
"locked": "0.2.5",
"requested": "0.2.5"
},
"com.google.guava:guava": {
"locked": "21.0",
"requested": "20.0"
},
"commons-io:commons-io": {
"locked": "2.5",
"requested": "2.5"
},
"io.dropwizard.metrics:metrics-core": {
"locked": "3.2.2",
"requested": "3.2.2"
},
"io.zipkin.brave:brave": {
"locked": "4.0.6",
"requested": "4.0.6"
},
"io.zipkin.java:zipkin": {
"locked": "1.29.2",
"requested": "1.29.2"
},
"javax.validation:validation-api": {
"locked": "2.0.0.CR3",
"requested": "2.0.0.CR3"
},
"junit:junit": {
"locked": "4.12",
"requested": "4.12"
},
"org.apache.cassandra:cassandra-all": {
"locked": "3.11.0",
"requested": "3.11.0"
},
"org.apache.commons:commons-lang3": {
"locked": "3.6",
"requested": "3.6"
},
"org.aspectj:aspectjrt": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.aspectj:aspectjweaver": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.cassandraunit:cassandra-unit": {
"locked": "3.1.4.0-SNAPSHOT",
"requested": "3.1.4.0-SNAPSHOT"
},
"org.codehaus.jackson:jackson-core-asl": {
"locked": "1.9.13",
"requested": "1.9.13"
},
"org.codehaus.jackson:jackson-mapper-asl": {
"locked": "1.9.13",
"requested": "1.9.13"
},
"org.hamcrest:hamcrest-core": {
"locked": "1.3",
"requested": "1.3"
},
"org.hamcrest:hamcrest-library": {
"locked": "1.3",
"requested": "1.3"
},
"org.mockito:mockito-core": {
"locked": "2.8.47",
"requested": "2.8.47"
},
"org.slf4j:jcl-over-slf4j": {
"locked": "1.7.7",
"requested": "1.7.1"
},
"org.slf4j:slf4j-api": {
"locked": "1.7.25",
"requested": "1.7.1"
},
"org.springframework:spring-core": {
"locked": "4.3.10.RELEASE",
"requested": "4.3.10.RELEASE"
}
},
"testRuntimeClasspath": {
"com.anthemengineering.mojo:infer-maven-plugin": {
"locked": "0.1.0",
"requested": "0.1.0"
},
"com.datastax.cassandra:cassandra-driver-core": {
"locked": "3.3.0",
"requested": "3.3.0"
},
"com.diffplug.durian:durian": {
"locked": "3.5.0-SNAPSHOT",
"requested": "3.+"
},
"com.github.stephenc:jamm": {
"locked": "0.2.5",
"requested": "0.2.5"
},
"com.google.guava:guava": {
"locked": "21.0",
"requested": "20.0"
},
"commons-io:commons-io": {
"locked": "2.5",
"requested": "2.5"
},
"io.dropwizard.metrics:metrics-core": {
"locked": "3.2.2",
"requested": "3.2.2"
},
"io.zipkin.brave:brave": {
"locked": "4.0.6",
"requested": "4.0.6"
},
"io.zipkin.java:zipkin": {
"locked": "1.29.2",
"requested": "1.29.2"
},
"javax.validation:validation-api": {
"locked": "2.0.0.CR3",
"requested": "2.0.0.CR3"
},
"junit:junit": {
"locked": "4.12",
"requested": "4.12"
},
"org.apache.cassandra:cassandra-all": {
"locked": "3.11.0",
"requested": "3.11.0"
},
"org.apache.commons:commons-lang3": {
"locked": "3.6",
"requested": "3.6"
},
"org.aspectj:aspectjrt": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.aspectj:aspectjweaver": {
"locked": "1.8.10",
"requested": "1.8.10"
},
"org.cassandraunit:cassandra-unit": {
"locked": "3.1.4.0-SNAPSHOT",
"requested": "3.1.4.0-SNAPSHOT"
},
"org.codehaus.jackson:jackson-core-asl": {
"locked": "1.9.13",
"requested": "1.9.13"
},
"org.codehaus.jackson:jackson-mapper-asl": {
"locked": "1.9.13",
"requested": "1.9.13"
},
"org.hamcrest:hamcrest-core": {
"locked": "1.3",
"requested": "1.3"
},
"org.hamcrest:hamcrest-library": {
"locked": "1.3",
"requested": "1.3"
},
"org.mockito:mockito-core": {
"locked": "2.8.47",
"requested": "2.8.47"
},
"org.slf4j:jcl-over-slf4j": {
"locked": "1.7.7",
"requested": "1.7.1"
},
"org.slf4j:slf4j-api": {
"locked": "1.7.25",
"requested": "1.7.1"
},
"org.springframework:spring-core": {
"locked": "4.3.10.RELEASE",
"requested": "4.3.10.RELEASE"
}
}
}

View file

@ -11,7 +11,7 @@
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: com.datastax.cassandra:cassandra-driver-core:3.3.0" level="project" />
<orderEntry type="library" name="Maven: com.datastax.cassandra:cassandra-driver-core:3.3.2" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty-handler:4.0.47.Final" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty-buffer:4.0.47.Final" level="project" />
<orderEntry type="library" name="Maven: io.netty:netty-common:4.0.47.Final" level="project" />
@ -28,15 +28,14 @@
<orderEntry type="library" name="Maven: com.github.jnr:jnr-x86asm:1.0.2" level="project" />
<orderEntry type="library" name="Maven: com.github.jnr:jnr-posix:3.0.27" level="project" />
<orderEntry type="library" name="Maven: com.github.jnr:jnr-constants:0.9.0" level="project" />
<orderEntry type="library" name="Maven: com.datastax.cassandra:cassandra-driver-extras:3.3.2" level="project" />
<orderEntry type="library" name="Maven: com.diffplug.durian:durian:3.4.0" level="project" />
<orderEntry type="library" name="Maven: org.aspectj:aspectjweaver:1.8.10" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.6" level="project" />
<orderEntry type="library" name="Maven: org.springframework:spring-core:4.3.10.RELEASE" level="project" />
<orderEntry type="library" name="Maven: commons-logging:commons-logging:1.2" level="project" />
<orderEntry type="library" name="Maven: javax.cache:cache-api:1.1.0" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:20.0" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.java:zipkin:1.29.2" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.brave:brave:4.0.6" level="project" />
<orderEntry type="library" name="Maven: io.zipkin.reporter:zipkin-reporter:0.6.12" level="project" />
<orderEntry type="library" name="Maven: io.dropwizard.metrics:metrics-core:3.2.2" level="project" />
<orderEntry type="library" name="Maven: javax.validation:validation-api:2.0.0.CR3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.9.13" level="project" />
@ -115,9 +114,9 @@
<orderEntry type="library" scope="TEST" name="Maven: org.caffinitas.ohc:ohc-core:0.4.4" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.github.ben-manes.caffeine:caffeine:2.2.6" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.jctools:jctools-core:1.2.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: ca.exprofesso:guava-jcache:1.0.4" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: commons-io:commons-io:2.5" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.12" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: com.github.stephenc:jamm:0.2.5" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-library:1.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.mockito:mockito-core:2.8.47" level="project" />

54
pom.xml
View file

@ -109,7 +109,13 @@
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.3.0</version>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
@ -136,25 +142,19 @@
<version>4.3.10.RELEASE</version>
</dependency>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
<!-- Metrics and tracing -->
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin</artifactId>
<version>1.29.2</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave</artifactId>
<version>4.0.6</version>
</dependency>
<!-- Metrics -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
@ -211,6 +211,24 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>ca.exprofesso</groupId>
<artifactId>guava-jcache</artifactId>
<version>1.0.4</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
@ -225,13 +243,6 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.stephenc</groupId>
<artifactId>jamm</artifactId>
<version>0.2.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
@ -266,7 +277,6 @@
<version>1.7.1</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>

View file

@ -1 +0,0 @@
rootProject.name = 'helenus-core'

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -5,7 +5,7 @@ import com.datastax.driver.core.querybuilder.Select;
public class CreateMaterializedView extends Create {
private String viewName;
private final String viewName;
private Select.Where selection;
private String primaryKey;
private String clustering;

View file

@ -1,47 +1,48 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.config;
import java.lang.reflect.Method;
import java.util.function.Function;
import net.helenus.core.DslInstantiator;
import net.helenus.core.MapperInstantiator;
import net.helenus.core.reflect.ReflectionDslInstantiator;
import net.helenus.core.reflect.ReflectionMapperInstantiator;
import net.helenus.mapping.convert.CamelCaseToUnderscoreConverter;
public class DefaultHelenusSettings implements HelenusSettings {
@Override
public Function<String, String> getPropertyToColumnConverter() {
return CamelCaseToUnderscoreConverter.INSTANCE;
}
@Override
public Function<Method, Boolean> getGetterMethodDetector() {
return GetterMethodDetector.INSTANCE;
}
@Override
public DslInstantiator getDslInstantiator() {
return ReflectionDslInstantiator.INSTANCE;
}
@Override
public MapperInstantiator getMapperInstantiator() {
return ReflectionMapperInstantiator.INSTANCE;
}
}
/*
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.config;
import java.lang.reflect.Method;
import java.util.function.Function;
import net.helenus.core.DslInstantiator;
import net.helenus.core.MapperInstantiator;
import net.helenus.core.reflect.ReflectionDslInstantiator;
import net.helenus.core.reflect.ReflectionMapperInstantiator;
import net.helenus.mapping.convert.CamelCaseToUnderscoreConverter;
public class DefaultHelenusSettings implements HelenusSettings {
@Override
public Function<String, String> getPropertyToColumnConverter() {
return CamelCaseToUnderscoreConverter.INSTANCE;
}
@Override
public Function<Method, Boolean> getGetterMethodDetector() {
return GetterMethodDetector.INSTANCE;
}
@Override
public DslInstantiator getDslInstantiator() {
return ReflectionDslInstantiator.INSTANCE;
}
@Override
public MapperInstantiator getMapperInstantiator() {
return ReflectionMapperInstantiator.INSTANCE;
}
}

View file

@ -1,48 +1,49 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.config;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.function.Function;
import net.helenus.mapping.annotation.Transient;
public enum GetterMethodDetector implements Function<Method, Boolean> {
INSTANCE;
@Override
public Boolean apply(Method method) {
if (method == null) {
throw new IllegalArgumentException("empty parameter");
}
if (method.getParameterCount() != 0 || method.getReturnType() == void.class) {
return false;
}
if (Modifier.isStatic(method.getModifiers())) {
return false;
}
// Methods marked "Transient" are not mapped, skip them.
if (method.getDeclaredAnnotation(Transient.class) != null) {
return false;
}
return true;
}
}
/*
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.config;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.function.Function;
import net.helenus.mapping.annotation.Transient;
public enum GetterMethodDetector implements Function<Method, Boolean> {
INSTANCE;
@Override
public Boolean apply(Method method) {
if (method == null) {
throw new IllegalArgumentException("empty parameter");
}
if (method.getParameterCount() != 0 || method.getReturnType() == void.class) {
return false;
}
if (Modifier.isStatic(method.getModifiers())) {
return false;
}
// Methods marked "Transient" are not mapped, skip them.
if (method.getDeclaredAnnotation(Transient.class) != null) {
return false;
}
return true;
}
}

View file

@ -1,32 +1,33 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.config;
import java.lang.reflect.Method;
import java.util.function.Function;
import net.helenus.core.DslInstantiator;
import net.helenus.core.MapperInstantiator;
public interface HelenusSettings {
Function<String, String> getPropertyToColumnConverter();
Function<Method, Boolean> getGetterMethodDetector();
DslInstantiator getDslInstantiator();
MapperInstantiator getMapperInstantiator();
}
/*
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.config;
import java.lang.reflect.Method;
import java.util.function.Function;
import net.helenus.core.DslInstantiator;
import net.helenus.core.MapperInstantiator;
public interface HelenusSettings {
Function<String, String> getPropertyToColumnConverter();
Function<Method, Boolean> getGetterMethodDetector();
DslInstantiator getDslInstantiator();
MapperInstantiator getMapperInstantiator();
}

View file

@ -33,6 +33,6 @@ public abstract class AbstractAuditedEntityDraft<E> extends AbstractEntityDraft<
}
public Date createdAt() {
return (Date) get("createdAt", Date.class);
return get("createdAt", Date.class);
}
}

View file

@ -2,22 +2,36 @@ package net.helenus.core;
import com.google.common.primitives.Primitives;
import java.io.Serializable;
import java.util.*;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import net.helenus.core.reflect.DefaultPrimitiveTypes;
import net.helenus.core.reflect.Drafted;
import net.helenus.core.reflect.MapExportable;
import net.helenus.mapping.HelenusProperty;
import net.helenus.mapping.MappingUtil;
import org.apache.commons.lang3.SerializationUtils;
public abstract class AbstractEntityDraft<E> implements Drafted<E> {
private final Map<String, Object> backingMap = new HashMap<String, Object>();
private final MapExportable entity;
private final Map<String, Object> entityMap;
private final Map<String, Object> valuesMap;
private final Set<String> readSet;
private final Map<String, Object> mutationsMap = new HashMap<String, Object>();
public AbstractEntityDraft(MapExportable entity) {
this.entity = entity;
this.entityMap = entity != null ? entity.toMap() : new HashMap<String, Object>();
// Entities can mutate their map.
if (entity != null) {
this.valuesMap = entity.toMap(true);
this.readSet = entity.toReadSet();
} else {
this.valuesMap = new HashMap<String, Object>();
this.readSet = new HashSet<String>();
}
}
public abstract Class<E> getEntityClass();
@ -33,10 +47,11 @@ public abstract class AbstractEntityDraft<E> implements Drafted<E> {
@SuppressWarnings("unchecked")
public <T> T get(String key, Class<?> returnType) {
T value = (T) backingMap.get(key);
readSet.add(key);
T value = (T) mutationsMap.get(key);
if (value == null) {
value = (T) entityMap.get(key);
value = (T) valuesMap.get(key);
if (value == null) {
if (Primitives.allPrimitiveTypes().contains(returnType)) {
@ -49,14 +64,9 @@ public abstract class AbstractEntityDraft<E> implements Drafted<E> {
return (T) type.getDefaultValue();
}
} else {
// Collections fetched from the entityMap
// Collections fetched from the valuesMap
if (value instanceof Collection) {
try {
value = MappingUtil.<T>clone(value);
} catch (CloneNotSupportedException e) {
// TODO(gburd): deep?shallow? copy of List, Map, Set to a mutable collection.
value = (T) SerializationUtils.<Serializable>clone((Serializable) value);
}
value = (T) SerializationUtils.<Serializable>clone((Serializable) value);
}
}
}
@ -65,7 +75,17 @@ public abstract class AbstractEntityDraft<E> implements Drafted<E> {
}
public <T> Object set(Getter<T> getter, Object value) {
return set(this.<T>methodNameFor(getter), value);
HelenusProperty prop = MappingUtil.resolveMappingProperty(getter).getProperty();
String key = prop.getPropertyName();
HelenusValidator.INSTANCE.validate(prop, value);
if (key == null || value == null) {
return null;
}
mutationsMap.put(key, value);
return value;
}
public Object set(String key, Object value) {
@ -73,12 +93,12 @@ public abstract class AbstractEntityDraft<E> implements Drafted<E> {
return null;
}
backingMap.put(key, value);
mutationsMap.put(key, value);
return value;
}
public void put(String key, Object value) {
backingMap.put(key, value);
mutationsMap.put(key, value);
}
@SuppressWarnings("unchecked")
@ -86,27 +106,21 @@ public abstract class AbstractEntityDraft<E> implements Drafted<E> {
return (T) mutate(this.<T>methodNameFor(getter), value);
}
public Object mutate(String key, Object value) {
public <T> T mutate(String key, T value) {
Objects.requireNonNull(key);
if (value == null) {
return null;
}
if (entity != null) {
Map<String, Object> map = entity.toMap();
if (map.containsKey(key) && !value.equals(map.get(key))) {
backingMap.put(key, value);
return value;
if (value != null) {
if (entity != null) {
T currentValue = this.<T>fetch(key);
if (!value.equals(currentValue)) {
mutationsMap.put(key, value);
return value;
}
} else {
mutationsMap.put(key, value);
}
return map.get(key);
} else {
backingMap.put(key, value);
return null;
}
return null;
}
private <T> String methodNameFor(Getter<T> getter) {
@ -119,8 +133,8 @@ public abstract class AbstractEntityDraft<E> implements Drafted<E> {
public Object unset(String key) {
if (key != null) {
Object value = backingMap.get(key);
backingMap.put(key, null);
Object value = mutationsMap.get(key);
mutationsMap.put(key, null);
return value;
}
return null;
@ -130,10 +144,18 @@ public abstract class AbstractEntityDraft<E> implements Drafted<E> {
return this.<T>reset(this.<T>methodNameFor(getter), desiredValue);
}
private <T> T fetch(String key) {
T value = (T) mutationsMap.get(key);
if (value == null) {
value = (T) valuesMap.get(key);
}
return value;
}
public <T> boolean reset(String key, T desiredValue) {
if (key != null && desiredValue != null) {
@SuppressWarnings("unchecked")
T currentValue = (T) backingMap.get(key);
T currentValue = (T) this.<T>fetch(key);
if (currentValue == null || !currentValue.equals(desiredValue)) {
set(key, desiredValue);
return true;
@ -144,7 +166,7 @@ public abstract class AbstractEntityDraft<E> implements Drafted<E> {
@Override
public Map<String, Object> toMap() {
return toMap(entityMap);
return toMap(valuesMap);
}
public Map<String, Object> toMap(Map<String, Object> entityMap) {
@ -155,21 +177,26 @@ public abstract class AbstractEntityDraft<E> implements Drafted<E> {
combined.put(e.getKey(), e.getValue());
}
} else {
combined = new HashMap<String, Object>(backingMap.size());
combined = new HashMap<String, Object>(mutationsMap.size());
}
for (String key : mutated()) {
combined.put(key, backingMap.get(key));
combined.put(key, mutationsMap.get(key));
}
return combined;
}
@Override
public Set<String> mutated() {
return backingMap.keySet();
return mutationsMap.keySet();
}
@Override
public Set<String> read() {
return readSet;
}
@Override
public String toString() {
return backingMap.toString();
return mutationsMap.toString();
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,7 +16,6 @@
*/
package net.helenus.core;
import brave.Tracer;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*;
import com.google.common.base.Stopwatch;
@ -25,7 +25,6 @@ import java.io.PrintStream;
import java.util.List;
import java.util.concurrent.Executor;
import net.helenus.core.cache.Facet;
import net.helenus.core.operation.Operation;
import net.helenus.mapping.value.ColumnValuePreparer;
import net.helenus.mapping.value.ColumnValueProvider;
import net.helenus.support.Either;
@ -43,6 +42,8 @@ public abstract class AbstractSessionOperations {
public abstract boolean isShowCql();
public abstract boolean showValues();
public abstract PrintStream getPrintStream();
public abstract Executor getExecutor();
@ -59,7 +60,6 @@ public abstract class AbstractSessionOperations {
public PreparedStatement prepare(RegularStatement statement) {
try {
logStatement(statement, false);
return currentSession().prepare(statement);
} catch (RuntimeException e) {
throw translateException(e);
@ -68,63 +68,48 @@ public abstract class AbstractSessionOperations {
public ListenableFuture<PreparedStatement> prepareAsync(RegularStatement statement) {
try {
logStatement(statement, false);
return currentSession().prepareAsync(statement);
} catch (RuntimeException e) {
throw translateException(e);
}
}
public ResultSet execute(Statement statement, boolean showValues) {
return execute(statement, null, null, showValues);
public ResultSet execute(Statement statement) {
return execute(statement, null, null);
}
public ResultSet execute(Statement statement, Stopwatch timer, boolean showValues) {
return execute(statement, null, timer, showValues);
public ResultSet execute(Statement statement, Stopwatch timer) {
return execute(statement, null, timer);
}
public ResultSet execute(Statement statement, UnitOfWork uow, boolean showValues) {
return execute(statement, uow, null, showValues);
public ResultSet execute(Statement statement, UnitOfWork uow) {
return execute(statement, uow, null);
}
public ResultSet execute(
Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {
return executeAsync(statement, uow, timer, showValues).getUninterruptibly();
public ResultSet execute(Statement statement, UnitOfWork uow, Stopwatch timer) {
return executeAsync(statement, uow, timer).getUninterruptibly();
}
public ResultSetFuture executeAsync(Statement statement, boolean showValues) {
return executeAsync(statement, null, null, showValues);
public ResultSetFuture executeAsync(Statement statement) {
return executeAsync(statement, null, null);
}
public ResultSetFuture executeAsync(Statement statement, Stopwatch timer, boolean showValues) {
return executeAsync(statement, null, timer, showValues);
public ResultSetFuture executeAsync(Statement statement, Stopwatch timer) {
return executeAsync(statement, null, timer);
}
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, boolean showValues) {
return executeAsync(statement, uow, null, showValues);
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow) {
return executeAsync(statement, uow, null);
}
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, Stopwatch timer) {
try {
logStatement(statement, showValues);
return currentSession().executeAsync(statement);
} catch (RuntimeException e) {
throw translateException(e);
}
}
private void logStatement(Statement statement, boolean showValues) {
if (isShowCql()) {
printCql(Operation.queryString(statement, showValues));
} else if (LOG.isDebugEnabled()) {
LOG.info("CQL> " + Operation.queryString(statement, showValues));
}
}
public Tracer getZipkinTracer() {
return null;
}
public MetricRegistry getMetricRegistry() {
return null;
}
@ -144,9 +129,5 @@ public abstract class AbstractSessionOperations {
public void updateCache(Object pojo, List<Facet> facets) {}
void printCql(String cql) {
getPrintStream().println(cql);
}
public void cacheEvict(List<Facet> facets) {}
}

View file

@ -1,440 +0,0 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core;
import static net.helenus.core.HelenusSession.deleted;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ResultSet;
import com.diffplug.common.base.Errors;
import com.google.common.base.Stopwatch;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.google.common.collect.TreeTraverser;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.core.operation.AbstractOperation;
import net.helenus.core.operation.BatchOperation;
import net.helenus.support.Either;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public abstract class AbstractUnitOfWork<E extends Exception>
implements UnitOfWork<E>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractUnitOfWork.class);
private final List<AbstractUnitOfWork<E>> nested = new ArrayList<>();
private final HelenusSession session;
private final AbstractUnitOfWork<E> parent;
private final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create();
protected String purpose;
protected List<String> nestedPurposes = new ArrayList<String>();
protected String info;
protected int cacheHits = 0;
protected int cacheMisses = 0;
protected int databaseLookups = 0;
protected Stopwatch elapsedTime;
protected Map<String, Double> databaseTime = new HashMap<>();
protected double cacheLookupTime = 0.0;
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
private boolean aborted = false;
private boolean committed = false;
private long committedAt = 0L;
private BatchOperation batch;
protected AbstractUnitOfWork(HelenusSession session, AbstractUnitOfWork<E> parent) {
Objects.requireNonNull(session, "containing session cannot be null");
this.session = session;
this.parent = parent;
}
@Override
public void addDatabaseTime(String name, Stopwatch amount) {
Double time = databaseTime.get(name);
if (time == null) {
databaseTime.put(name, (double) amount.elapsed(TimeUnit.MICROSECONDS));
} else {
databaseTime.put(name, time + amount.elapsed(TimeUnit.MICROSECONDS));
}
}
@Override
public void addCacheLookupTime(Stopwatch amount) {
cacheLookupTime += amount.elapsed(TimeUnit.MICROSECONDS);
}
@Override
public void addNestedUnitOfWork(UnitOfWork<E> uow) {
synchronized (nested) {
nested.add((AbstractUnitOfWork<E>) uow);
}
}
@Override
public synchronized UnitOfWork<E> begin() {
if (LOG.isInfoEnabled()) {
elapsedTime = Stopwatch.createStarted();
}
// log.record(txn::start)
return this;
}
@Override
public String getPurpose() {
return purpose;
}
@Override
public UnitOfWork setPurpose(String purpose) {
this.purpose = purpose;
return this;
}
@Override
public void setInfo(String info) {
this.info = info;
}
@Override
public void recordCacheAndDatabaseOperationCount(int cache, int ops) {
if (cache > 0) {
cacheHits += cache;
} else {
cacheMisses += Math.abs(cache);
}
if (ops > 0) {
databaseLookups += ops;
}
}
public String logTimers(String what) {
double e = (double) elapsedTime.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
double d = 0.0;
double c = cacheLookupTime / 1000.0;
double fc = (c / e) * 100.0;
String database = "";
if (databaseTime.size() > 0) {
List<String> dbt = new ArrayList<>(databaseTime.size());
for (Map.Entry<String, Double> dt : databaseTime.entrySet()) {
double t = dt.getValue() / 1000.0;
d += t;
dbt.add(String.format("%s took %,.3fms %,2.2f%%", dt.getKey(), t, (t / e) * 100.0));
}
double fd = (d / e) * 100.0;
database =
String.format(
", %d quer%s (%,.3fms %,2.2f%% - %s)",
databaseLookups, (databaseLookups > 1) ? "ies" : "y", d, fd, String.join(", ", dbt));
}
String cache = "";
if (cacheLookupTime > 0) {
int cacheLookups = cacheHits + cacheMisses;
cache =
String.format(
" with %d cache lookup%s (%,.3fms %,2.2f%% - %,d hit, %,d miss)",
cacheLookups, cacheLookups > 1 ? "s" : "", c, fc, cacheHits, cacheMisses);
}
String da = "";
if (databaseTime.size() > 0 || cacheLookupTime > 0) {
double dat = d + c;
double daf = (dat / e) * 100;
da =
String.format(
" consuming %,.3fms for data access, or %,2.2f%% of total UOW time.", dat, daf);
}
String x = nestedPurposes.stream().distinct().collect(Collectors.joining(", "));
String n =
nested
.stream()
.map(uow -> String.valueOf(uow.hashCode()))
.collect(Collectors.joining(", "));
String s =
String.format(
Locale.US,
"UOW(%s%s) %s in %,.3fms%s%s%s%s%s%s",
hashCode(),
(nested.size() > 0 ? ", [" + n + "]" : ""),
what,
e,
cache,
database,
da,
(purpose == null ? "" : " " + purpose),
(nestedPurposes.isEmpty()) ? "" : ", " + x,
(info == null) ? "" : " " + info);
return s;
}
private void applyPostCommitFunctions() {
if (!postCommit.isEmpty()) {
for (CommitThunk f : postCommit) {
f.apply();
}
}
if (LOG.isInfoEnabled()) {
LOG.info(logTimers("committed"));
}
}
@Override
public Optional<Object> cacheLookup(List<Facet> facets) {
String tableName = CacheUtil.schemaName(facets);
Optional<Object> result = Optional.empty();
for (Facet facet : facets) {
if (!facet.fixed()) {
String columnName = facet.name() + "==" + facet.value();
Either<Object, List<Facet>> eitherValue = cache.get(tableName, columnName);
if (eitherValue != null) {
Object value = deleted;
if (eitherValue.isLeft()) {
value = eitherValue.getLeft();
}
result = Optional.of(value);
break;
}
}
}
if (!result.isPresent()) {
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
if (parent != null) {
return parent.cacheLookup(facets);
}
}
return result;
}
@Override
public List<Facet> cacheEvict(List<Facet> facets) {
Either<Object, List<Facet>> deletedObjectFacets = Either.right(facets);
String tableName = CacheUtil.schemaName(facets);
Optional<Object> optionalValue = cacheLookup(facets);
if (optionalValue.isPresent()) {
Object value = optionalValue.get();
for (Facet facet : facets) {
if (!facet.fixed()) {
String columnKey = facet.name() + "==" + facet.value();
// mark the value identified by the facet to `deleted`
cache.put(tableName, columnKey, deletedObjectFacets);
}
}
// look for other row/col pairs that referenced the same object, mark them
// `deleted`
cache
.columnKeySet()
.forEach(
columnKey -> {
Either<Object, List<Facet>> eitherCachedValue = cache.get(tableName, columnKey);
if (eitherCachedValue.isLeft()) {
Object cachedValue = eitherCachedValue.getLeft();
if (cachedValue == value) {
cache.put(tableName, columnKey, deletedObjectFacets);
String[] parts = columnKey.split("==");
facets.add(new Facet<String>(parts[0], parts[1]));
}
}
});
}
return facets;
}
@Override
public void cacheUpdate(Object value, List<Facet> facets) {
String tableName = CacheUtil.schemaName(facets);
for (Facet facet : facets) {
if (!facet.fixed()) {
if (facet.alone()) {
String columnName = facet.name() + "==" + facet.value();
cache.put(tableName, columnName, Either.left(value));
}
}
}
}
public void batch(AbstractOperation s) {
if (batch == null) {
batch = new BatchOperation(session);
}
batch.add(s);
}
private Iterator<AbstractUnitOfWork<E>> getChildNodes() {
return nested.iterator();
}
/**
* Checks to see if the work performed between calling begin and now can be committed or not.
*
* @return a function from which to chain work that only happens when commit is successful
* @throws E when the work overlaps with other concurrent writers.
*/
public PostCommitFunction<Void, Void> commit() throws E, TimeoutException {
if (batch != null) {
committedAt = batch.sync(this);
//TODO(gburd) update cache with writeTime...
}
// All nested UnitOfWork should be committed (not aborted) before calls to
// commit, check.
boolean canCommit = true;
TreeTraverser<AbstractUnitOfWork<E>> traverser =
TreeTraverser.using(node -> node::getChildNodes);
for (AbstractUnitOfWork<E> uow : traverser.postOrderTraversal(this)) {
if (this != uow) {
canCommit &= (!uow.aborted && uow.committed);
}
}
// log.record(txn::provisionalCommit)
// examine log for conflicts in read-set and write-set between begin and
// provisional commit
// if (conflict) { throw new ConflictingUnitOfWorkException(this) }
// else return function so as to enable commit.andThen(() -> { do something iff
// commit was successful; })
if (canCommit) {
committed = true;
aborted = false;
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
elapsedTime.stop();
if (parent == null) {
// Apply all post-commit functions, this is the outter-most UnitOfWork.
traverser
.postOrderTraversal(this)
.forEach(
uow -> {
uow.applyPostCommitFunctions();
});
// Merge our cache into the session cache.
session.mergeCache(cache);
return new PostCommitFunction(this, null);
} else {
// Merge cache and statistics into parent if there is one.
parent.mergeCache(cache);
parent.addBatched(batch);
if (purpose != null) {
parent.nestedPurposes.add(purpose);
}
parent.cacheHits += cacheHits;
parent.cacheMisses += cacheMisses;
parent.databaseLookups += databaseLookups;
parent.cacheLookupTime += cacheLookupTime;
for (Map.Entry<String, Double> dt : databaseTime.entrySet()) {
String name = dt.getKey();
if (parent.databaseTime.containsKey(name)) {
double t = parent.databaseTime.get(name);
parent.databaseTime.put(name, t + dt.getValue());
} else {
parent.databaseTime.put(name, dt.getValue());
}
}
}
}
// else {
// Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass);
// T object = ctor.newInstance(new Object[] { String message });
// }
return new PostCommitFunction(this, postCommit);
}
private void addBatched(BatchOperation batch) {
if (this.batch == null) {
this.batch = batch;
} else {
this.batch.addAll(batch);
}
}
/* Explicitly discard the work and mark it as as such in the log. */
public synchronized void abort() {
TreeTraverser<AbstractUnitOfWork<E>> traverser =
TreeTraverser.using(node -> node::getChildNodes);
traverser
.postOrderTraversal(this)
.forEach(
uow -> {
uow.committed = false;
uow.aborted = true;
});
// log.record(txn::abort)
// cache.invalidateSince(txn::start time)
if (LOG.isInfoEnabled()) {
if (elapsedTime.isRunning()) {
elapsedTime.stop();
}
LOG.info(logTimers("aborted"));
}
}
private void mergeCache(Table<String, String, Either<Object, List<Facet>>> from) {
Table<String, String, Either<Object, List<Facet>>> to = this.cache;
from.rowMap()
.forEach(
(rowKey, columnMap) -> {
columnMap.forEach(
(columnKey, value) -> {
if (to.contains(rowKey, columnKey)) {
// TODO(gburd): merge case, preserve object identity
to.put(
rowKey,
columnKey,
Either.left(
CacheUtil.merge(
to.get(rowKey, columnKey).getLeft(),
from.get(rowKey, columnKey).getLeft())));
} else {
to.put(rowKey, columnKey, from.get(rowKey, columnKey));
}
});
});
}
public String describeConflicts() {
return "it's complex...";
}
@Override
public void close() throws E {
// Closing a AbstractUnitOfWork will abort iff we've not already aborted or
// committed this unit of work.
if (aborted == false && committed == false) {
abort();
}
}
public boolean hasAborted() {
return aborted;
}
public boolean hasCommitted() {
return committed;
}
public long committedAt() { return committedAt; }
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,6 +0,0 @@
package net.helenus.core;
@FunctionalInterface
public interface CommitThunk {
void apply();
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -79,7 +80,8 @@ public final class Filter<V> {
return new Filter<V>(node, postulate);
}
public static <V> Filter<V> create(Getter<V> getter, HelenusPropertyNode node, Postulate<V> postulate) {
public static <V> Filter<V> create(
Getter<V> getter, HelenusPropertyNode node, Postulate<V> postulate) {
Objects.requireNonNull(getter, "empty getter");
Objects.requireNonNull(postulate, "empty operator");
return new Filter<V>(node, postulate);

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -81,6 +82,10 @@ public final class Helenus {
return new SessionInitializer(session);
}
public static SessionInitializer init(Session session, String keyspace) {
return new SessionInitializer(session, keyspace);
}
public static SessionInitializer init(Session session) {
if (session == null) {

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,23 +18,19 @@ package net.helenus.core;
import static net.helenus.core.Query.eq;
import brave.Tracer;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*;
import com.google.common.collect.Table;
import java.io.Closeable;
import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheManager;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.core.cache.SessionCache;
import net.helenus.core.cache.UnboundFacet;
import net.helenus.core.operation.*;
import net.helenus.core.reflect.Drafted;
@ -47,74 +44,69 @@ import net.helenus.support.*;
import net.helenus.support.Fun.Tuple1;
import net.helenus.support.Fun.Tuple2;
import net.helenus.support.Fun.Tuple6;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelenusSession extends AbstractSessionOperations implements Closeable {
public static final Object deleted = new Object();
private static final Logger LOG = LoggerFactory.getLogger(HelenusSession.class);
private static final Pattern classNameRegex =
Pattern.compile("^(?:\\w+\\.)+(?:(\\w+)|(\\w+)\\$.*)$");
private final Session session;
private final CodecRegistry registry;
private final ConsistencyLevel defaultConsistencyLevel;
private final boolean defaultQueryIdempotency;
private final MetricRegistry metricRegistry;
private final Tracer zipkinTracer;
private final PrintStream printStream;
private final Class<? extends UnitOfWork> unitOfWorkClass;
private final SessionRepository sessionRepository;
private final Executor executor;
private final boolean dropSchemaOnClose;
private final SessionCache<String, Object> sessionCache;
private final CacheManager cacheManager;
private final RowColumnValueProvider valueProvider;
private final StatementColumnValuePreparer valuePreparer;
private final Metadata metadata;
private volatile String usingKeyspace;
private volatile boolean showCql;
private volatile boolean showValues;
HelenusSession(
Session session,
String usingKeyspace,
CodecRegistry registry,
boolean showCql,
boolean showValues,
PrintStream printStream,
SessionRepositoryBuilder sessionRepositoryBuilder,
Executor executor,
boolean dropSchemaOnClose,
ConsistencyLevel consistencyLevel,
boolean defaultQueryIdempotency,
Class<? extends UnitOfWork> unitOfWorkClass,
SessionCache sessionCache,
MetricRegistry metricRegistry,
Tracer tracer) {
CacheManager cacheManager,
MetricRegistry metricRegistry) {
this.session = session;
this.registry = registry == null ? CodecRegistry.DEFAULT_INSTANCE : registry;
this.usingKeyspace =
Objects.requireNonNull(
usingKeyspace, "keyspace needs to be selected before creating session");
this.showCql = showCql;
this.showValues = showValues;
this.printStream = printStream;
this.sessionRepository = sessionRepositoryBuilder.build();
this.sessionRepository =
sessionRepositoryBuilder == null ? null : sessionRepositoryBuilder.build();
this.executor = executor;
this.dropSchemaOnClose = dropSchemaOnClose;
this.defaultConsistencyLevel = consistencyLevel;
this.defaultQueryIdempotency = defaultQueryIdempotency;
this.unitOfWorkClass = unitOfWorkClass;
this.metricRegistry = metricRegistry;
this.zipkinTracer = tracer;
if (sessionCache == null) {
this.sessionCache = SessionCache.<String, Object>defaultCache();
} else {
this.sessionCache = sessionCache;
}
this.cacheManager = cacheManager;
this.valueProvider = new RowColumnValueProvider(this.sessionRepository);
this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository);
this.metadata = session.getCluster().getMetadata();
this.metadata = session == null ? null : session.getCluster().getMetadata();
}
public UnitOfWork begin() {
return new UnitOfWork(this).begin();
}
public UnitOfWork begin(UnitOfWork parent) {
return new UnitOfWork(this, parent).begin();
}
@Override
@ -153,6 +145,20 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
return this;
}
public HelenusSession showQueryValuesInLog(boolean showValues) {
this.showValues = showValues;
return this;
}
public HelenusSession showQueryValuesInLog() {
this.showValues = true;
return this;
}
public boolean showValues() {
return showValues;
}
@Override
public Executor getExecutor() {
return executor;
@ -173,11 +179,6 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
return valuePreparer;
}
@Override
public Tracer getZipkinTracer() {
return zipkinTracer;
}
@Override
public MetricRegistry getMetricRegistry() {
return metricRegistry;
@ -196,10 +197,15 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
@Override
public Object checkCache(String tableName, List<Facet> facets) {
Object result = null;
for (String key : CacheUtil.flatKeys(tableName, facets)) {
result = sessionCache.get(key);
if (result != null) {
return result;
if (cacheManager != null) {
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
for (String key : CacheUtil.flatKeys(tableName, facets)) {
result = cache.get(key);
if (result != null) {
return result;
}
}
}
}
return null;
@ -207,8 +213,13 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
@Override
public void cacheEvict(List<Facet> facets) {
String tableName = CacheUtil.schemaName(facets);
CacheUtil.flatKeys(tableName, facets).forEach(key -> sessionCache.invalidate(key));
if (cacheManager != null) {
String tableName = CacheUtil.schemaName(facets);
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
CacheUtil.flatKeys(tableName, facets).forEach(key -> cache.remove(key));
}
}
}
@Override
@ -246,146 +257,96 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
@Override
public void mergeCache(Table<String, String, Either<Object, List<Facet>>> uowCache) {
List<Object> items =
uowCache
.values()
.stream()
.filter(Either::isLeft)
.map(Either::getLeft)
.distinct()
.collect(Collectors.toList());
for (Object pojo : items) {
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo));
Map<String, Object> valueMap =
pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
if (entity.isCacheable()) {
List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet
.getProperties()
.forEach(
prop -> {
if (valueMap == null) {
Object value =
BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop);
binder.setValueForProperty(prop, value.toString());
} else {
Object v = valueMap.get(prop.getPropertyName());
if (v != null) {
binder.setValueForProperty(prop, v.toString());
if (cacheManager != null) {
List<Object> items =
uowCache
.values()
.stream()
.filter(Either::isLeft)
.map(Either::getLeft)
.distinct()
.collect(Collectors.toList());
for (Object pojo : items) {
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo));
Map<String, Object> valueMap =
pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
if (entity.isCacheable()) {
List<Facet> boundFacets = new ArrayList<>();
for (Facet facet : entity.getFacets()) {
if (facet instanceof UnboundFacet) {
UnboundFacet unboundFacet = (UnboundFacet) facet;
UnboundFacet.Binder binder = unboundFacet.binder();
unboundFacet
.getProperties()
.forEach(
prop -> {
if (valueMap == null) {
Object value =
BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop);
binder.setValueForProperty(prop, value.toString());
} else {
Object v = valueMap.get(prop.getPropertyName());
if (v != null) {
binder.setValueForProperty(prop, v.toString());
}
}
}
});
if (binder.isBound()) {
boundFacets.add(binder.bind());
});
if (binder.isBound()) {
boundFacets.add(binder.bind());
}
} else {
boundFacets.add(facet);
}
} else {
boundFacets.add(facet);
}
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
String tableName = CacheUtil.schemaName(boundFacets);
replaceCachedFacetValues(pojo, tableName, facetCombinations);
}
List<String[]> facetCombinations = CacheUtil.flattenFacets(boundFacets);
String tableName = CacheUtil.schemaName(boundFacets);
replaceCachedFacetValues(pojo, tableName, facetCombinations);
}
}
List<List<Facet>> deletedFacetSets =
uowCache
.values()
.stream()
.filter(Either::isRight)
.map(Either::getRight)
.collect(Collectors.toList());
for (List<Facet> facets : deletedFacetSets) {
String tableName = CacheUtil.schemaName(facets);
List<String> keys = CacheUtil.flatKeys(tableName, facets);
keys.forEach(key -> sessionCache.invalidate(key));
List<List<Facet>> deletedFacetSets =
uowCache
.values()
.stream()
.filter(Either::isRight)
.map(Either::getRight)
.collect(Collectors.toList());
for (List<Facet> facets : deletedFacetSets) {
String tableName = CacheUtil.schemaName(facets);
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
List<String> keys = CacheUtil.flatKeys(tableName, facets);
keys.forEach(key -> cache.remove(key));
}
}
}
}
private void replaceCachedFacetValues(
Object pojo, String tableName, List<String[]> facetCombinations) {
for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination);
if (pojo == null || pojo == HelenusSession.deleted) {
sessionCache.invalidate(cacheKey);
} else {
sessionCache.put(cacheKey, pojo);
if (cacheManager != null) {
for (String[] combination : facetCombinations) {
String cacheKey = tableName + "." + Arrays.toString(combination);
Cache<String, Object> cache = cacheManager.getCache(tableName);
if (cache != null) {
if (pojo == null || pojo == HelenusSession.deleted) {
cache.remove(cacheKey);
} else {
cache.put(cacheKey, pojo);
}
}
}
}
}
public CacheManager getCacheManager() {
return cacheManager;
}
public Metadata getMetadata() {
return metadata;
}
public UnitOfWork begin() {
return this.begin(null);
}
private String extractClassNameFromStackFrame(String classNameOnStack) {
String name = null;
Matcher m = classNameRegex.matcher(classNameOnStack);
if (m.find()) {
name = (m.group(1) != null) ? m.group(1) : ((m.group(2) != null) ? m.group(2) : name);
} else {
name = classNameOnStack;
}
return name;
}
public synchronized UnitOfWork begin(UnitOfWork parent) {
try {
Class<? extends UnitOfWork> clazz = unitOfWorkClass;
Constructor<? extends UnitOfWork> ctor =
clazz.getConstructor(HelenusSession.class, UnitOfWork.class);
UnitOfWork uow = ctor.newInstance(this, parent);
if (LOG.isInfoEnabled() && uow.getPurpose() == null) {
StringBuilder purpose = null;
int frame = 0;
StackTraceElement[] trace = Thread.currentThread().getStackTrace();
String targetClassName = HelenusSession.class.getSimpleName();
String stackClassName = null;
do {
frame++;
stackClassName = extractClassNameFromStackFrame(trace[frame].getClassName());
} while (!stackClassName.equals(targetClassName) && frame < trace.length);
do {
frame++;
stackClassName = extractClassNameFromStackFrame(trace[frame].getClassName());
} while (stackClassName.equals(targetClassName) && frame < trace.length);
if (frame < trace.length) {
purpose =
new StringBuilder()
.append(trace[frame].getClassName())
.append(".")
.append(trace[frame].getMethodName())
.append("(")
.append(trace[frame].getFileName())
.append(":")
.append(trace[frame].getLineNumber())
.append(")");
uow.setPurpose(purpose.toString());
}
}
if (parent != null) {
parent.addNestedUnitOfWork(uow);
}
return uow.begin();
} catch (NoSuchMethodException
| InvocationTargetException
| InstantiationException
| IllegalAccessException e) {
throw new HelenusException(
String.format(
"Unable to instantiate %s as a UnitOfWork.", unitOfWorkClass.getSimpleName()),
e);
}
}
public <E> SelectOperation<E> select(E pojo) {
Objects.requireNonNull(
pojo, "supplied object must be a dsl for a registered entity but cannot be null");
@ -420,9 +381,17 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
return new SelectOperation<Fun.ArrayTuple>(this);
}
public SelectOperation<Row> selectAll(Class<?> entityClass) {
public <E> SelectOperation<E> selectAll(Class<E> entityClass) {
Objects.requireNonNull(entityClass, "entityClass is empty");
return new SelectOperation<Row>(this, Helenus.entity(entityClass));
HelenusEntity entity = Helenus.entity(entityClass);
return new SelectOperation<E>(
this,
entity,
(r) -> {
Map<String, Object> map = new ValueProviderMap(r, valueProvider, entity);
return (E) Helenus.map(entityClass, map);
});
}
public <E> SelectOperation<Row> selectAll(E pojo) {
@ -442,7 +411,8 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
Objects.requireNonNull(getter1, "field 1 is empty");
HelenusPropertyNode p1 = MappingUtil.resolveMappingProperty(getter1);
return new SelectOperation<Tuple1<V1>>(this, new Mappers.Mapper1<V1>(getValueProvider(), p1), p1);
return new SelectOperation<Tuple1<V1>>(
this, new Mappers.Mapper1<V1>(getValueProvider(), p1), p1);
}
public <V1, V2> SelectOperation<Tuple2<V1, V2>> select(Getter<V1> getter1, Getter<V2> getter2) {
@ -451,7 +421,8 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
HelenusPropertyNode p1 = MappingUtil.resolveMappingProperty(getter1);
HelenusPropertyNode p2 = MappingUtil.resolveMappingProperty(getter2);
return new SelectOperation<Fun.Tuple2<V1, V2>>(this, new Mappers.Mapper2<V1, V2>(getValueProvider(), p1, p2), p1, p2);
return new SelectOperation<Fun.Tuple2<V1, V2>>(
this, new Mappers.Mapper2<V1, V2>(getValueProvider(), p1, p2), p1, p2);
}
public <V1, V2, V3> SelectOperation<Fun.Tuple3<V1, V2, V3>> select(
@ -695,21 +666,21 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
if (entity != null) {
return new InsertOperation<T>(this, entity, entity.getMappingInterface(), true);
} else {
return this.<T>insert(pojo, null);
return this.<T>insert(pojo, null, null);
}
}
public <T> InsertOperation<T> insert(Drafted draft) {
return insert(draft.build(), draft.mutated());
return insert(draft.build(), draft.mutated(), draft.read());
}
private <T> InsertOperation<T> insert(T pojo, Set<String> mutations) {
private <T> InsertOperation<T> insert(T pojo, Set<String> mutations, Set<String> read) {
Objects.requireNonNull(pojo, "pojo is empty");
Class<?> iface = MappingUtil.getMappingInterface(pojo);
HelenusEntity entity = Helenus.entity(iface);
return new InsertOperation<T>(this, entity, pojo, mutations, true);
return new InsertOperation<T>(this, entity, pojo, mutations, read, true);
}
public InsertOperation<ResultSet> upsert() {
@ -721,11 +692,12 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
}
public <T> InsertOperation<T> upsert(Drafted draft) {
return this.<T>upsert((T) draft.build(), draft.mutated());
return this.<T>upsert((T) draft.build(), draft.mutated(), draft.read());
}
public <T> InsertOperation<T> upsert(T pojo) {
Objects.requireNonNull(pojo,
Objects.requireNonNull(
pojo,
"supplied object must be either an instance of the entity class or a dsl for it, but cannot be null");
HelenusEntity entity = null;
try {
@ -735,17 +707,17 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
if (entity != null) {
return new InsertOperation<T>(this, entity, entity.getMappingInterface(), false);
} else {
return this.<T>upsert(pojo, null);
return this.<T>upsert(pojo, null, null);
}
}
private <T> InsertOperation<T> upsert(T pojo, Set<String> mutations) {
private <T> InsertOperation<T> upsert(T pojo, Set<String> mutations, Set<String> read) {
Objects.requireNonNull(pojo, "pojo is empty");
Class<?> iface = MappingUtil.getMappingInterface(pojo);
HelenusEntity entity = Helenus.entity(iface);
return new InsertOperation<T>(this, entity, pojo, mutations, false);
return new InsertOperation<T>(this, entity, pojo, mutations, read, false);
}
public DeleteOperation delete() {
@ -766,6 +738,9 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
}
public void close() {
if (session == null) {
return;
}
if (session.isClosed()) {
return;
@ -796,11 +771,11 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
switch (entity.getType()) {
case TABLE:
execute(SchemaUtil.dropTable(entity), true);
execute(SchemaUtil.dropTable(entity));
break;
case UDT:
execute(SchemaUtil.dropUserType(entity), true);
execute(SchemaUtil.dropUserType(entity));
break;
default:

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -2,24 +2,73 @@ package net.helenus.core;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import net.helenus.support.CheckedRunnable;
public class PostCommitFunction<T, R> implements java.util.function.Function<T, R> {
public static final PostCommitFunction<Void, Void> NULL_ABORT = new PostCommitFunction<Void, Void>(null, null, null, false);
public static final PostCommitFunction<Void, Void> NULL_COMMIT = new PostCommitFunction<Void, Void>(null, null, null, true);
private final UnitOfWork uow;
private final List<CommitThunk> postCommit;
private final List<CheckedRunnable> commitThunks;
private final List<CheckedRunnable> abortThunks;
private Consumer<? super Throwable> exceptionallyThunk;
private boolean committed;
PostCommitFunction(UnitOfWork uow, List<CommitThunk> postCommit) {
this.uow = uow;
this.postCommit = postCommit;
PostCommitFunction(List<CheckedRunnable> postCommit, List<CheckedRunnable> abortThunks,
Consumer<? super Throwable> exceptionallyThunk,
boolean committed) {
this.commitThunks = postCommit;
this.abortThunks = abortThunks;
this.exceptionallyThunk = exceptionallyThunk;
this.committed = committed;
}
public void andThen(CommitThunk after) {
private void apply(CheckedRunnable... fns) {
try {
for (CheckedRunnable fn : fns) {
fn.run();
}
} catch (Throwable t) {
if (exceptionallyThunk != null) {
exceptionallyThunk.accept(t);
}
}
}
public PostCommitFunction<T, R> andThen(CheckedRunnable... after) {
Objects.requireNonNull(after);
if (postCommit == null) {
after.apply();
if (commitThunks == null) {
if (committed) {
apply(after);
}
} else {
postCommit.add(after);
for (CheckedRunnable fn : after) {
commitThunks.add(fn);
}
}
return this;
}
public PostCommitFunction<T, R> orElse(CheckedRunnable... after) {
Objects.requireNonNull(after);
if (abortThunks == null) {
if (!committed) {
apply(after);
}
} else {
for (CheckedRunnable fn : after) {
abortThunks.add(fn);
}
}
return this;
}
public PostCommitFunction<T, R> exceptionally(Consumer<? super Throwable> fn) {
Objects.requireNonNull(fn);
exceptionallyThunk = fn;
return this;
}
@Override

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -337,7 +338,7 @@ public final class SchemaUtil {
public static SchemaStatement createIndex(HelenusProperty prop) {
if (prop.caseSensitiveIndex()) {
return SchemaBuilder.createIndex(prop.getIndexName().get().toCql())
return SchemaBuilder.createIndex(indexName(prop))
.ifNotExists()
.onTable(prop.getEntity().getName().toCql())
.andColumn(prop.getColumnName().toCql());
@ -406,7 +407,7 @@ public final class SchemaUtil {
}
public static SchemaStatement dropIndex(HelenusProperty prop) {
return SchemaBuilder.dropIndex(prop.getIndexName().get().toCql()).ifExists();
return SchemaBuilder.dropIndex(indexName(prop)).ifExists();
}
private static SchemaBuilder.Direction mapDirection(OrderingDirection o) {
@ -465,4 +466,9 @@ public final class SchemaUtil {
}
return null;
}
private static String indexName(HelenusProperty prop) {
return prop.getEntity().getName().toCql() + "_" + prop.getIndexName().get().toCql();
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,7 +16,6 @@
*/
package net.helenus.core;
import brave.Tracer;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*;
import com.google.common.util.concurrent.MoreExecutors;
@ -25,7 +25,7 @@ import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import net.helenus.core.cache.SessionCache;
import javax.cache.CacheManager;
import net.helenus.core.reflect.DslExportable;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusEntityType;
@ -43,19 +43,26 @@ public final class SessionInitializer extends AbstractSessionOperations {
private CodecRegistry registry;
private String usingKeyspace;
private boolean showCql = false;
private boolean showValues = true;
private ConsistencyLevel consistencyLevel;
private boolean idempotent = true;
private boolean idempotent = false;
private MetricRegistry metricRegistry = new MetricRegistry();
private Tracer zipkinTracer;
private PrintStream printStream = System.out;
private Executor executor = MoreExecutors.directExecutor();
private Class<? extends UnitOfWork> unitOfWorkClass = UnitOfWorkImpl.class;
private SessionRepositoryBuilder sessionRepository;
private boolean dropUnusedColumns = false;
private boolean dropUnusedIndexes = false;
private KeyspaceMetadata keyspaceMetadata;
private AutoDdl autoDdl = AutoDdl.UPDATE;
private SessionCache sessionCache = null;
private CacheManager cacheManager = null;
SessionInitializer(Session session, String keyspace) {
this.session = session;
this.usingKeyspace = keyspace;
if (session != null) {
this.sessionRepository = new SessionRepositoryBuilder(session);
}
}
SessionInitializer(Session session) {
this.session = Objects.requireNonNull(session, "empty session");
@ -103,28 +110,32 @@ public final class SessionInitializer extends AbstractSessionOperations {
return this;
}
public SessionInitializer showQueryValuesInLog(boolean showValues) {
this.showValues = showValues;
return this;
}
public SessionInitializer showQueryValuesInLog() {
this.showValues = true;
return this;
}
public boolean showValues() {
return showValues;
}
public SessionInitializer metricRegistry(MetricRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
return this;
}
public SessionInitializer zipkinTracer(Tracer tracer) {
this.zipkinTracer = tracer;
return this;
}
public SessionInitializer setUnitOfWorkClass(Class<? extends UnitOfWork> e) {
this.unitOfWorkClass = e;
return this;
}
public SessionInitializer consistencyLevel(ConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}
public SessionInitializer setSessionCache(SessionCache sessionCache) {
this.sessionCache = sessionCache;
public SessionInitializer setCacheManager(CacheManager cacheManager) {
this.cacheManager = cacheManager;
return this;
}
@ -132,6 +143,11 @@ public final class SessionInitializer extends AbstractSessionOperations {
return consistencyLevel;
}
public SessionInitializer setOperationsIdempotentByDefault() {
this.idempotent = true;
return this;
}
public SessionInitializer idempotentQueryExecution(boolean idempotent) {
this.idempotent = idempotent;
return this;
@ -233,8 +249,10 @@ public final class SessionInitializer extends AbstractSessionOperations {
}
public SessionInitializer use(String keyspace) {
session.execute(SchemaUtil.use(keyspace, false));
this.usingKeyspace = keyspace;
if (session != null) {
session.execute(SchemaUtil.use(keyspace, false));
this.usingKeyspace = keyspace;
}
return this;
}
@ -255,16 +273,15 @@ public final class SessionInitializer extends AbstractSessionOperations {
usingKeyspace,
registry,
showCql,
showValues,
printStream,
sessionRepository,
executor,
autoDdl == AutoDdl.CREATE_DROP,
consistencyLevel,
idempotent,
unitOfWorkClass,
sessionCache,
metricRegistry,
zipkinTracer);
cacheManager,
metricRegistry);
}
private void initialize() {
@ -281,10 +298,16 @@ public final class SessionInitializer extends AbstractSessionOperations {
}
DslExportable dsl = (DslExportable) Helenus.dsl(iface);
dsl.setCassandraMetadataForHelenusSession(session.getCluster().getMetadata());
sessionRepository.add(dsl);
if (session != null) {
dsl.setCassandraMetadataForHelenusSession(session.getCluster().getMetadata());
}
if (sessionRepository != null) {
sessionRepository.add(dsl);
}
});
if (session == null) return;
TableOperations tableOps = new TableOperations(this, dropUnusedColumns, dropUnusedIndexes);
UserTypeOperations userTypeOps = new UserTypeOperations(this, dropUnusedColumns);

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -35,12 +36,12 @@ public final class TableOperations {
}
public void createTable(HelenusEntity entity) {
sessionOps.execute(SchemaUtil.createTable(entity), true);
sessionOps.execute(SchemaUtil.createTable(entity));
executeBatch(SchemaUtil.createIndexes(entity));
}
public void dropTable(HelenusEntity entity) {
sessionOps.execute(SchemaUtil.dropTable(entity), true);
sessionOps.execute(SchemaUtil.dropTable(entity));
}
public void validateTable(TableMetadata tmd, HelenusEntity entity) {
@ -79,17 +80,14 @@ public final class TableOperations {
public void createView(HelenusEntity entity) {
sessionOps.execute(
SchemaUtil.createMaterializedView(
sessionOps.usingKeyspace(), entity.getName().toCql(), entity),
true);
// executeBatch(SchemaUtil.createIndexes(entity)); NOTE: Unfortunately C* 3.10
// does not yet support 2i on materialized views.
sessionOps.usingKeyspace(), entity.getName().toCql(), entity));
// executeBatch(SchemaUtil.createIndexes(entity)); NOTE: Unfortunately C* 3.10 does not yet support 2i on materialized views.
}
public void dropView(HelenusEntity entity) {
sessionOps.execute(
SchemaUtil.dropMaterializedView(
sessionOps.usingKeyspace(), entity.getName().toCql(), entity),
true);
sessionOps.usingKeyspace(), entity.getName().toCql(), entity));
}
public void updateView(TableMetadata tmd, HelenusEntity entity) {
@ -104,9 +102,6 @@ public final class TableOperations {
private void executeBatch(List<SchemaStatement> list) {
list.forEach(
s -> {
sessionOps.execute(s, true);
});
list.forEach(s -> sessionOps.execute(s));
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,16 +16,138 @@
*/
package net.helenus.core;
import com.datastax.driver.core.Statement;
import static net.helenus.core.HelenusSession.deleted;
import com.google.common.base.Stopwatch;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import com.google.common.collect.TreeTraverser;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Configuration;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CompletionListener;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.core.cache.MapCache;
import net.helenus.core.operation.AbstractOperation;
import net.helenus.core.operation.BatchOperation;
import net.helenus.mapping.MappingUtil;
import net.helenus.support.CheckedRunnable;
import net.helenus.support.Either;
import net.helenus.support.HelenusException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface UnitOfWork<X extends Exception> extends AutoCloseable {
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public class UnitOfWork implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(UnitOfWork.class);
public final UnitOfWork parent;
protected final List<UnitOfWork> nested = new ArrayList<>();
protected final Table<String, String, Either<Object, List<Facet>>> cache = HashBasedTable.create();
protected final EvictTrackingMapCache<String, Object> statementCache;
protected final HelenusSession session;
protected String purpose;
protected List<String> nestedPurposes = new ArrayList<String>();
protected String info;
protected int cacheHits = 0;
protected int cacheMisses = 0;
protected int databaseLookups = 0;
protected final Stopwatch elapsedTime;
protected Map<String, Double> databaseTime = new HashMap<>();
protected double cacheLookupTimeMSecs = 0.0;
private List<CheckedRunnable> commitThunks = new ArrayList<>();
private List<CheckedRunnable> abortThunks = new ArrayList<>();
private Consumer<? super Throwable> exceptionallyThunk;
private List<CompletableFuture<?>> asyncOperationFutures = new ArrayList<CompletableFuture<?>>();
private boolean aborted = false;
private boolean committed = false;
private long committedAt = 0L;
private BatchOperation batch;
public UnitOfWork(HelenusSession session) {
this(session, null);
}
public UnitOfWork(HelenusSession session, UnitOfWork parent) {
Objects.requireNonNull(session, "containing session cannot be null");
this.parent = parent;
if (parent != null) {
parent.addNestedUnitOfWork(this);
}
this.session = session;
CacheLoader<String, Object> cacheLoader = null;
if (parent != null) {
cacheLoader =
new CacheLoader<String, Object>() {
Cache<String, Object> cache = parent.getCache();
@Override
public Object load(String key) throws CacheLoaderException {
return cache.get(key);
}
@Override
public Map<String, Object> loadAll(Iterable<? extends String> keys)
throws CacheLoaderException {
Map<String, Object> kvp = new HashMap<String, Object>();
for (String key : keys) {
kvp.put(key, cache.get(key));
}
return kvp;
}
};
}
this.elapsedTime = Stopwatch.createUnstarted();
this.statementCache = new EvictTrackingMapCache<String, Object>(null, "UOW(" + hashCode() + ")", cacheLoader, true);
}
public void addDatabaseTime(String name, Stopwatch amount) {
Double time = databaseTime.get(name);
if (time == null) {
databaseTime.put(name, (double) amount.elapsed(TimeUnit.MICROSECONDS));
} else {
databaseTime.put(name, time + amount.elapsed(TimeUnit.MICROSECONDS));
}
}
public void addCacheLookupTime(Stopwatch amount) {
cacheLookupTimeMSecs += amount.elapsed(TimeUnit.MICROSECONDS);
}
public void addNestedUnitOfWork(UnitOfWork uow) {
synchronized (nested) {
nested.add(uow);
}
}
/**
* Marks the beginning of a transactional section of work. Will write a
@ -32,48 +155,662 @@ public interface UnitOfWork<X extends Exception> extends AutoCloseable {
*
* @return the handle used to commit or abort the work.
*/
UnitOfWork<X> begin();
public synchronized UnitOfWork begin() {
elapsedTime.start();
// log.record(txn::start)
return this;
}
void addNestedUnitOfWork(UnitOfWork<X> uow);
public String getPurpose() {
return purpose;
}
public UnitOfWork setPurpose(String purpose) {
this.purpose = purpose;
return this;
}
public void addFuture(CompletableFuture<?> future) {
asyncOperationFutures.add(future);
}
public void setInfo(String info) {
this.info = info;
}
public void recordCacheAndDatabaseOperationCount(int cache, int ops) {
if (cache > 0) {
cacheHits += cache;
} else {
cacheMisses += Math.abs(cache);
}
if (ops > 0) {
databaseLookups += ops;
}
}
public String logTimers(String what) {
double e = (double) elapsedTime.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
double d = 0.0;
double c = cacheLookupTimeMSecs / 1000.0;
double fc = (c / e) * 100.0;
String database = "";
if (databaseTime.size() > 0) {
List<String> dbt = new ArrayList<>(databaseTime.size());
for (Map.Entry<String, Double> dt : databaseTime.entrySet()) {
double t = dt.getValue() / 1000.0;
d += t;
dbt.add(String.format("%s took %,.3fms %,2.2f%%", dt.getKey(), t, (t / e) * 100.0));
}
double fd = (d / e) * 100.0;
database =
String.format(
", %d quer%s (%,.3fms %,2.2f%% - %s)",
databaseLookups, (databaseLookups > 1) ? "ies" : "y", d, fd, String.join(", ", dbt));
}
String cache = "";
if (cacheLookupTimeMSecs > 0) {
int cacheLookups = cacheHits + cacheMisses;
cache =
String.format(
" with %d cache lookup%s (%,.3fms %,2.2f%% - %,d hit, %,d miss)",
cacheLookups, cacheLookups > 1 ? "s" : "", c, fc, cacheHits, cacheMisses);
}
String da = "";
if (databaseTime.size() > 0 || cacheLookupTimeMSecs > 0) {
double dat = d + c;
double daf = (dat / e) * 100;
da =
String.format(
" consuming %,.3fms for data access, or %,2.2f%% of total UOW time.", dat, daf);
}
String x = nestedPurposes.stream().distinct().collect(Collectors.joining(", "));
String n =
nested
.stream()
.map(uow -> String.valueOf(uow.hashCode()))
.collect(Collectors.joining(", "));
String s =
String.format(
Locale.US,
"UOW(%s%s) %s in %,.3fms%s%s%s%s%s%s",
hashCode(),
(nested.size() > 0 ? ", [" + n + "]" : ""),
what,
e,
cache,
database,
da,
(purpose == null ? "" : " " + purpose),
(nestedPurposes.isEmpty()) ? "" : ", " + x,
(info == null) ? "" : " " + info);
return s;
}
private void applyPostCommitFunctions(String what, List<CheckedRunnable> thunks, Consumer<? super Throwable> exceptionallyThunk) {
if (!thunks.isEmpty()) {
for (CheckedRunnable f : thunks) {
try {
f.run();
} catch (Throwable t) {
if (exceptionallyThunk != null) {
exceptionallyThunk.accept(t);
}
}
}
}
}
public Optional<Object> cacheLookup(List<Facet> facets) {
String tableName = CacheUtil.schemaName(facets);
Optional<Object> result = Optional.empty();
for (Facet facet : facets) {
if (!facet.fixed()) {
String columnName = facet.name() + "==" + facet.value();
Either<Object, List<Facet>> eitherValue = cache.get(tableName, columnName);
if (eitherValue != null) {
Object value = deleted;
if (eitherValue.isLeft()) {
value = eitherValue.getLeft();
}
return Optional.of(value);
}
}
}
// Be sure to check all enclosing UnitOfWork caches as well, we may be nested.
result = checkParentCache(facets);
if (result.isPresent()) {
Object r = result.get();
Class<?> iface = MappingUtil.getMappingInterface(r);
if (Helenus.entity(iface).isDraftable()) {
cacheUpdate(r, facets);
} else {
cacheUpdate(SerializationUtils.<Serializable>clone((Serializable) r), facets);
}
}
return result;
}
private Optional<Object> checkParentCache(List<Facet> facets) {
Optional<Object> result = Optional.empty();
if (parent != null) {
result = parent.checkParentCache(facets);
}
return result;
}
public List<Facet> cacheEvict(List<Facet> facets) {
Either<Object, List<Facet>> deletedObjectFacets = Either.right(facets);
String tableName = CacheUtil.schemaName(facets);
Optional<Object> optionalValue = cacheLookup(facets);
for (Facet facet : facets) {
if (!facet.fixed()) {
String columnKey = facet.name() + "==" + facet.value();
// mark the value identified by the facet to `deleted`
cache.put(tableName, columnKey, deletedObjectFacets);
}
}
// Now, look for other row/col pairs that referenced the same object, mark them
// `deleted` if the cache had a value before we added the deleted marker objects.
if (optionalValue.isPresent()) {
Object value = optionalValue.get();
cache
.columnKeySet()
.forEach(
columnKey -> {
Either<Object, List<Facet>> eitherCachedValue = cache.get(tableName, columnKey);
if (eitherCachedValue.isLeft()) {
Object cachedValue = eitherCachedValue.getLeft();
if (cachedValue == value) {
cache.put(tableName, columnKey, deletedObjectFacets);
String[] parts = columnKey.split("==");
facets.add(new Facet<String>(parts[0], parts[1]));
}
}
});
}
return facets;
}
public Cache<String, Object> getCache() {
return statementCache;
}
public Object cacheUpdate(Object value, List<Facet> facets) {
Object result = null;
String tableName = CacheUtil.schemaName(facets);
for (Facet facet : facets) {
if (!facet.fixed()) {
if (facet.alone()) {
String columnName = facet.name() + "==" + facet.value();
if (result == null) result = cache.get(tableName, columnName);
cache.put(tableName, columnName, Either.left(value));
}
}
}
return result;
}
public void batch(AbstractOperation s) {
if (batch == null) {
batch = new BatchOperation(session);
}
batch.add(s);
}
private Iterator<UnitOfWork> getChildNodes() {
return nested.iterator();
}
/**
* Checks to see if the work performed between calling begin and now can be committed or not.
*
* @return a function from which to chain work that only happens when commit is successful
* @throws X when the work overlaps with other concurrent writers.
* @throws HelenusException when the work overlaps with other concurrent writers.
*/
PostCommitFunction<Void, Void> commit() throws X, TimeoutException;
public synchronized PostCommitFunction<Void, Void> commit() throws HelenusException {
if (isDone()) {
return PostCommitFunction.NULL_ABORT;
}
// Only the outer-most UOW batches statements for commit time, execute them.
if (batch != null) {
committedAt = batch.sync(this); //TODO(gburd): update cache with writeTime...
}
// All nested UnitOfWork should be committed (not aborted) before calls to
// commit, check.
boolean canCommit = true;
TreeTraverser<UnitOfWork> traverser = TreeTraverser.using(node -> node::getChildNodes);
for (UnitOfWork uow : traverser.postOrderTraversal(this)) {
if (this != uow) {
canCommit &= (!uow.aborted && uow.committed);
}
}
if (!canCommit) {
if (parent == null) {
// Apply all post-commit abort functions, this is the outer-most UnitOfWork.
traverser
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("aborted", abortThunks, exceptionallyThunk);
});
elapsedTime.stop();
if (LOG.isInfoEnabled()) {
LOG.info(logTimers("aborted"));
}
}
return PostCommitFunction.NULL_ABORT;
} else {
committed = true;
aborted = false;
if (parent == null) {
// Apply all post-commit commit functions, this is the outer-most UnitOfWork.
traverser
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("committed", uow.commitThunks, exceptionallyThunk);
});
// Merge our statement cache into the session cache if it exists.
CacheManager cacheManager = session.getCacheManager();
if (cacheManager != null) {
for (Map.Entry<String, Object> entry :
(Set<Map.Entry<String, Object>>) statementCache.<Map>unwrap(Map.class).entrySet()) {
String[] keyParts = entry.getKey().split("\\.");
if (keyParts.length == 2) {
String cacheName = keyParts[0];
String key = keyParts[1];
if (!StringUtils.isBlank(cacheName) && !StringUtils.isBlank(key)) {
Cache<Object, Object> cache = cacheManager.getCache(cacheName);
if (cache != null) {
Object value = entry.getValue();
if (value == deleted) {
cache.remove(key);
} else {
cache.put(key.toString(), value);
}
}
}
}
}
}
// Merge our cache into the session cache.
session.mergeCache(cache);
// Spoil any lingering futures that may be out there.
asyncOperationFutures.forEach(
f ->
f.completeExceptionally(
new HelenusException(
"Futures must be resolved before their unit of work has committed/aborted.")));
elapsedTime.stop();
if (LOG.isInfoEnabled()) {
LOG.info(logTimers("committed"));
}
return PostCommitFunction.NULL_COMMIT;
} else {
// Merge cache and statistics into parent if there is one.
parent.statementCache.putAll(statementCache.<Map>unwrap(Map.class));
parent.statementCache.removeAll(statementCache.getDeletions());
parent.mergeCache(cache);
parent.addBatched(batch);
if (purpose != null) {
parent.nestedPurposes.add(purpose);
}
parent.cacheHits += cacheHits;
parent.cacheMisses += cacheMisses;
parent.databaseLookups += databaseLookups;
parent.cacheLookupTimeMSecs += cacheLookupTimeMSecs;
for (Map.Entry<String, Double> dt : databaseTime.entrySet()) {
String name = dt.getKey();
if (parent.databaseTime.containsKey(name)) {
double t = parent.databaseTime.get(name);
parent.databaseTime.put(name, t + dt.getValue());
} else {
parent.databaseTime.put(name, dt.getValue());
}
}
}
}
// TODO(gburd): hopefully we'll be able to detect conflicts here and so we'd want to...
// else {
// Constructor<T> ctor = clazz.getConstructor(conflictExceptionClass);
// T object = ctor.newInstance(new Object[] { String message });
// }
return new PostCommitFunction<Void, Void>(commitThunks, abortThunks, exceptionallyThunk, true);
}
private void addBatched(BatchOperation batchArg) {
if (batchArg != null) {
if (this.batch == null) {
this.batch = batchArg;
} else {
this.batch.addAll(batchArg);
}
}
}
/**
* Explicitly abort the work within this unit of work. Any nested aborted unit of work will
* trigger the entire unit of work to commit.
*/
void abort();
public synchronized void abort() {
if (!aborted) {
aborted = true;
boolean hasAborted();
// Spoil any pending futures created within the context of this unit of work.
asyncOperationFutures.forEach(
f ->
f.completeExceptionally(
new HelenusException(
"Futures must be resolved before their unit of work has committed/aborted.")));
boolean hasCommitted();
TreeTraverser<UnitOfWork> traverser = TreeTraverser.using(node -> node::getChildNodes);
traverser
.postOrderTraversal(this)
.forEach(
uow -> {
applyPostCommitFunctions("aborted", uow.abortThunks, exceptionallyThunk);
uow.abortThunks.clear();
});
long committedAt();
if (parent == null) {
if (elapsedTime.isRunning()) {
elapsedTime.stop();
}
if (LOG.isInfoEnabled()) {
LOG.info(logTimers("aborted"));
}
}
void batch(AbstractOperation operation);
// TODO(gburd): when we integrate the transaction support we'll need to...
// log.record(txn::abort)
// cache.invalidateSince(txn::start time)
}
}
Optional<Object> cacheLookup(List<Facet> facets);
private void mergeCache(Table<String, String, Either<Object, List<Facet>>> from) {
Table<String, String, Either<Object, List<Facet>>> to = this.cache;
from.rowMap()
.forEach(
(rowKey, columnMap) -> {
columnMap.forEach(
(columnKey, value) -> {
if (to.contains(rowKey, columnKey)) {
to.put(
rowKey,
columnKey,
Either.left(
CacheUtil.merge(
to.get(rowKey, columnKey).getLeft(),
from.get(rowKey, columnKey).getLeft())));
} else {
to.put(rowKey, columnKey, from.get(rowKey, columnKey));
}
});
});
}
void cacheUpdate(Object pojo, List<Facet> facets);
public boolean isDone() {
return aborted || committed;
}
List<Facet> cacheEvict(List<Facet> facets);
public String describeConflicts() {
return "it's complex...";
}
String getPurpose();
@Override
public void close() throws HelenusException {
// Closing a UnitOfWork will abort iff we've not already aborted or committed this unit of work.
if (aborted == false && committed == false) {
abort();
}
}
UnitOfWork setPurpose(String purpose);
public boolean hasAborted() {
return aborted;
}
void setInfo(String info);
public boolean hasCommitted() {
return committed;
}
void addDatabaseTime(String name, Stopwatch amount);
public long committedAt() {
return committedAt;
}
void addCacheLookupTime(Stopwatch amount);
private static class EvictTrackingMapCache<K, V> implements Cache<K, V> {
private final Set<K> deletes;
private final Cache<K, V> delegate;
// Cache > 0 means "cache hit", < 0 means cache miss.
void recordCacheAndDatabaseOperationCount(int cache, int database);
public EvictTrackingMapCache(CacheManager manager, String name, CacheLoader<K, V> cacheLoader,
boolean isReadThrough) {
deletes = Collections.synchronizedSet(new HashSet<>());
delegate = new MapCache<>(manager, name, cacheLoader, isReadThrough);
}
/** Non-interface method; should only be called by UnitOfWork when merging to an enclosing UnitOfWork. */
public Set<K> getDeletions() {
return new HashSet<>(deletes);
}
@Override
public V get(K key) {
if (deletes.contains(key)) {
return null;
}
return delegate.get(key);
}
@Override
public Map<K, V> getAll(Set<? extends K> keys) {
Set<? extends K> clonedKeys = new HashSet<>(keys);
clonedKeys.removeAll(deletes);
return delegate.getAll(clonedKeys);
}
@Override
public boolean containsKey(K key) {
if (deletes.contains(key)) {
return false;
}
return delegate.containsKey(key);
}
@Override
public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener listener) {
Set<? extends K> clonedKeys = new HashSet<>(keys);
clonedKeys.removeAll(deletes);
delegate.loadAll(clonedKeys, replaceExistingValues, listener);
}
@Override
public void put(K key, V value) {
if (deletes.contains(key)) {
deletes.remove(key);
}
delegate.put(key, value);
}
@Override
public V getAndPut(K key, V value) {
if (deletes.contains(key)) {
deletes.remove(key);
}
return delegate.getAndPut(key, value);
}
@Override
public void putAll(Map<? extends K, ? extends V> map) {
deletes.removeAll(map.keySet());
delegate.putAll(map);
}
@Override
public synchronized boolean putIfAbsent(K key, V value) {
if (!delegate.containsKey(key) && deletes.contains(key)) {
deletes.remove(key);
}
return delegate.putIfAbsent(key, value);
}
@Override
public boolean remove(K key) {
boolean removed = delegate.remove(key);
deletes.add(key);
return removed;
}
@Override
public boolean remove(K key, V value) {
boolean removed = delegate.remove(key, value);
if (removed) {
deletes.add(key);
}
return removed;
}
@Override
public V getAndRemove(K key) {
V value = delegate.getAndRemove(key);
deletes.add(key);
return value;
}
@Override
public void removeAll(Set<? extends K> keys) {
Set<? extends K> cloneKeys = new HashSet<>(keys);
delegate.removeAll(cloneKeys);
deletes.addAll(cloneKeys);
}
@Override
@SuppressWarnings("unchecked")
public synchronized void removeAll() {
Map<K, V> impl = delegate.unwrap(Map.class);
Set<K> keys = impl.keySet();
delegate.removeAll();
deletes.addAll(keys);
}
@Override
public void clear() {
delegate.clear();
// TODO(gburd): all parents too
deletes.clear();
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
if (deletes.contains(key)) {
return false;
}
return delegate.replace(key, oldValue, newValue);
}
@Override
public boolean replace(K key, V value) {
if (deletes.contains(key)) {
return false;
}
return delegate.replace(key, value);
}
@Override
public V getAndReplace(K key, V value) {
if (deletes.contains(key)) {
return null;
}
return delegate.getAndReplace(key, value);
}
@Override
public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
return delegate.getConfiguration(clazz);
}
@Override
public <T> T invoke(K key, EntryProcessor<K, V, T> processor, Object... arguments)
throws EntryProcessorException {
if (deletes.contains(key)) {
return null;
}
return delegate.invoke(key, processor, arguments);
}
@Override
public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> processor,
Object... arguments) {
Set<? extends K> clonedKeys = new HashSet<>(keys);
clonedKeys.removeAll(deletes);
return delegate.invokeAll(clonedKeys, processor, arguments);
}
@Override
public String getName() {
return delegate.getName();
}
@Override
public CacheManager getCacheManager() {
return delegate.getCacheManager();
}
@Override
public void close() {
delegate.close();
}
@Override
public boolean isClosed() {
return delegate.isClosed();
}
@Override
public <T> T unwrap(Class<T> clazz) {
return delegate.unwrap(clazz);
}
@Override
public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
delegate.registerCacheEntryListener(cacheEntryListenerConfiguration);
}
@Override
public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
delegate.deregisterCacheEntryListener(cacheEntryListenerConfiguration);
}
@Override
public Iterator<Entry<K, V>> iterator() {
return delegate.iterator();
}
}
}

View file

@ -1,26 +0,0 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core;
import net.helenus.support.HelenusException;
class UnitOfWorkImpl extends AbstractUnitOfWork<HelenusException> {
@SuppressWarnings("unchecked")
public UnitOfWorkImpl(HelenusSession session, UnitOfWork parent) {
super(session, (AbstractUnitOfWork<HelenusException>) parent);
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -33,12 +34,12 @@ public final class UserTypeOperations {
public void createUserType(HelenusEntity entity) {
sessionOps.execute(SchemaUtil.createUserType(entity), true);
sessionOps.execute(SchemaUtil.createUserType(entity));
}
public void dropUserType(HelenusEntity entity) {
sessionOps.execute(SchemaUtil.dropUserType(entity), true);
sessionOps.execute(SchemaUtil.dropUserType(entity));
}
public void validateUserType(UserType userType, HelenusEntity entity) {
@ -71,9 +72,6 @@ public final class UserTypeOperations {
private void executeBatch(List<SchemaStatement> list) {
list.forEach(
s -> {
sessionOps.execute(s, true);
});
list.forEach(s -> sessionOps.execute(s));
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,7 +31,9 @@ public class BoundFacet extends Facet<String> {
this.properties.put(property, value);
}
public Set<HelenusProperty> getProperties() { return properties.keySet(); }
public Set<HelenusProperty> getProperties() {
return properties.keySet();
}
public BoundFacet(String name, Map<HelenusProperty, Object> properties) {
super(

View file

@ -3,7 +3,15 @@ package net.helenus.core.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import net.helenus.core.Helenus;
import net.helenus.core.reflect.Entity;
import net.helenus.core.reflect.MapExportable;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusProperty;
import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.value.BeanColumnValueProvider;
public class CacheUtil {
@ -18,7 +26,8 @@ public class CacheUtil {
return out;
}
private static void kCombinations(List<String> items, int n, int k, String[] arr, List<String[]> out) {
private static void kCombinations(
List<String> items, int n, int k, String[] arr, List<String[]> out) {
if (k == 0) {
out.add(arr.clone());
} else {
@ -31,11 +40,12 @@ public class CacheUtil {
public static List<String> flatKeys(String table, List<Facet> facets) {
return flattenFacets(facets)
.stream()
.map(combination -> {
.stream()
.map(
combination -> {
return table + "." + Arrays.toString(combination);
})
.collect(Collectors.toList());
.collect(Collectors.toList());
}
public static List<String[]> flattenFacets(List<Facet> facets) {
@ -51,60 +61,136 @@ public class CacheUtil {
})
.collect(Collectors.toList()));
// TODO(gburd): rework so as to not generate the combinations at all rather than filter
facets = facets.stream()
facets =
facets
.stream()
.filter(f -> !f.fixed())
.filter(f -> !f.alone() || !f.combined())
.collect(Collectors.toList());
for (Facet facet : facets) {
combinations = combinations
combinations =
combinations
.stream()
.filter(combo -> {
// When used alone, this facet is not distinct so don't use it as a key.
if (combo.length == 1) {
if (!facet.alone() && combo[0].startsWith(facet.name() + "==")) {
return false;
}
} else {
if (!facet.combined()) {
for (String c : combo) {
// Don't use this facet in combination with others to create keys.
if (c.startsWith(facet.name() + "==")) {
.filter(
combo -> {
// When used alone, this facet is not distinct so don't use it as a key.
if (combo.length == 1) {
if (!facet.alone() && combo[0].startsWith(facet.name() + "==")) {
return false;
}
} else {
if (!facet.combined()) {
for (String c : combo) {
// Don't use this facet in combination with others to create keys.
if (c.startsWith(facet.name() + "==")) {
return false;
}
}
}
}
}
}
return true;
})
return true;
})
.collect(Collectors.toList());
}
return combinations;
}
/**
* Merge changed values in the map behind `from` into `to`.
*
* @param to
* @param from
* @return
*/
public static Object merge(Object to, Object from) {
if (to == from) {
/** Merge changed values in the map behind `from` into `to`. */
public static Object merge(Object t, Object f) {
HelenusEntity entity = Helenus.resolve(MappingUtil.getMappingInterface(t));
if (t == f) return t;
if (f == null) return t;
if (t == null) return f;
if (t instanceof MapExportable
&& t instanceof Entity
&& f instanceof MapExportable
&& f instanceof Entity) {
Entity to = (Entity) t;
Entity from = (Entity) f;
Map<String, Object> toValueMap = ((MapExportable) to).toMap();
Map<String, Object> fromValueMap = ((MapExportable) from).toMap();
for (HelenusProperty prop : entity.getOrderedProperties()) {
switch (prop.getColumnType()) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
continue;
default:
Object toVal = BeanColumnValueProvider.INSTANCE.getColumnValue(to, -1, prop, false);
Object fromVal = BeanColumnValueProvider.INSTANCE.getColumnValue(from, -1, prop, false);
String ttlKey = ttlKey(prop);
String writeTimeKey = writeTimeKey(prop);
int[] toTtlI = (int[]) toValueMap.get(ttlKey);
int toTtl = (toTtlI != null) ? toTtlI[0] : 0;
Long toWriteTime = (Long) toValueMap.get(writeTimeKey);
int[] fromTtlI = (int[]) fromValueMap.get(ttlKey);
int fromTtl = (fromTtlI != null) ? fromTtlI[0] : 0;
Long fromWriteTime = (Long) fromValueMap.get(writeTimeKey);
if (toVal != null) {
if (fromVal != null) {
if (toVal == fromVal) {
// Case: object identity
// Goal: ensure write time and ttl are also in sync
if (fromWriteTime != null
&& fromWriteTime != 0L
&& (toWriteTime == null || fromWriteTime > toWriteTime)) {
((MapExportable) to).put(writeTimeKey, fromWriteTime);
}
if (fromTtl > 0 && fromTtl > toTtl) {
((MapExportable) to).put(ttlKey, fromTtl);
}
} else if (fromWriteTime != null && fromWriteTime != 0L) {
// Case: to exists and from exists
// Goal: copy over from -> to iff from.writeTime > to.writeTime
if (toWriteTime != null && toWriteTime != 0L) {
if (fromWriteTime > toWriteTime) {
((MapExportable) to).put(prop.getPropertyName(), fromVal);
((MapExportable) to).put(writeTimeKey, fromWriteTime);
if (fromTtl > 0) {
((MapExportable) to).put(ttlKey, fromTtl);
}
}
} else {
((MapExportable) to).put(prop.getPropertyName(), fromVal);
((MapExportable) to).put(writeTimeKey, fromWriteTime);
if (fromTtl > 0) {
((MapExportable) to).put(ttlKey, fromTtl);
}
}
} else {
if (toWriteTime == null || toWriteTime == 0L) {
// Caution, entering grey area...
if (!toVal.equals(fromVal)) {
// dangerous waters here, values diverge without information that enables resolution,
// policy (for now) is to move value from -> to anyway.
((MapExportable) to).put(prop.getPropertyName(), fromVal);
if (fromTtl > 0) {
((MapExportable) to).put(ttlKey, fromTtl);
}
}
}
}
}
} else {
// Case: from exists, but to doesn't (it's null)
// Goal: copy over from -> to, include ttl and writeTime if present
if (fromVal != null) {
((MapExportable) to).put(prop.getPropertyName(), fromVal);
if (fromWriteTime != null && fromWriteTime != 0L) {
((MapExportable) to).put(writeTimeKey, fromWriteTime);
}
if (fromTtl > 0) {
((MapExportable) to).put(ttlKey, fromTtl);
}
}
}
}
}
return to;
} else {
return from;
}
/*
* // TODO(gburd): take ttl and writeTime into account when merging. Map<String,
* Object> toValueMap = to instanceof MapExportable ? ((MapExportable)
* to).toMap() : null; Map<String, Object> fromValueMap = to instanceof
* MapExportable ? ((MapExportable) from).toMap() : null;
*
* if (toValueMap != null && fromValueMap != null) { for (String key :
* fromValueMap.keySet()) { if (toValueMap.containsKey(key) &&
* toValueMap.get(key) != fromValueMap.get(key)) { toValueMap.put(key,
* fromValueMap.get(key)); } } } return to;
*/
return t;
}
public static String schemaName(List<Facet> facets) {
@ -115,9 +201,21 @@ public class CacheUtil {
.collect(Collectors.joining("."));
}
public static String writeTimeKey(String propertyName) {
return "_" + propertyName + "_writeTime";
public static String writeTimeKey(HelenusProperty prop) {
return writeTimeKey(prop.getColumnName().toCql(false));
}
public static String ttlKey(String propertyName) { return "_" + propertyName + "_ttl"; }
public static String ttlKey(HelenusProperty prop) {
return ttlKey(prop.getColumnName().toCql(false));
}
public static String writeTimeKey(String columnName) {
String key = "_" + columnName + "_writeTime";
return key.toLowerCase();
}
public static String ttlKey(String columnName) {
String key = "_" + columnName + "_ttl";
return key.toLowerCase();
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -58,6 +59,11 @@ public class Facet<T> {
this.combined = combined;
}
public boolean alone() { return alone; }
public boolean combined() { return combined; }
public boolean alone() {
return alone;
}
public boolean combined() {
return combined;
}
}

View file

@ -1,43 +0,0 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.cache;
import com.google.common.cache.Cache;
public class GuavaCache<K, V> implements SessionCache<K, V> {
final Cache<K, V> cache;
GuavaCache(Cache<K, V> cache) {
this.cache = cache;
}
@Override
public void invalidate(K key) {
cache.invalidate(key);
}
@Override
public V get(K key) {
return cache.getIfPresent(key);
}
@Override
public void put(K key, V value) {
cache.put(key, value);
}
}

View file

@ -0,0 +1,463 @@
package net.helenus.core.cache;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Configuration;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CompletionListener;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
public class MapCache<K, V> implements Cache<K, V> {
private final CacheManager manager;
private final String name;
private Map<K, V> map = new ConcurrentHashMap<>();
private Set<CacheEntryRemovedListener<K, V>> cacheEntryRemovedListeners = new HashSet<>();
private CacheLoader<K, V> cacheLoader = null;
private boolean isReadThrough = false;
private static class MapConfiguration<K, V> implements Configuration<K, V> {
private static final long serialVersionUID = 6093947542772516209L;
@Override
public Class<K> getKeyType() {
return null;
}
@Override
public Class<V> getValueType() {
return null;
}
@Override
public boolean isStoreByValue() {
return false;
}
}
public MapCache(
CacheManager manager, String name, CacheLoader<K, V> cacheLoader, boolean isReadThrough) {
this.manager = manager;
this.name = name;
this.cacheLoader = cacheLoader;
this.isReadThrough = isReadThrough;
}
/** {@inheritDoc} */
@Override
public V get(K key) {
V value = null;
synchronized (map) {
value = map.get(key);
if (value == null && isReadThrough && cacheLoader != null) {
V loadedValue = cacheLoader.load(key);
if (loadedValue != null) {
map.put(key, loadedValue);
value = loadedValue;
}
}
}
return value;
}
/** {@inheritDoc} */
@Override
public Map<K, V> getAll(Set<? extends K> keys) {
Map<K, V> result = null;
synchronized (map) {
result = new HashMap<K, V>(keys.size());
Iterator<? extends K> it = keys.iterator();
while (it.hasNext()) {
K key = it.next();
V value = map.get(key);
if (value != null) {
result.put(key, value);
it.remove();
}
}
if (keys.size() != 0 && isReadThrough && cacheLoader != null) {
Map<K, V> loadedValues = cacheLoader.loadAll(keys);
for (Map.Entry<K, V> entry : loadedValues.entrySet()) {
V v = entry.getValue();
if (v != null) {
K k = entry.getKey();
map.put(k, v);
result.put(k, v);
}
}
}
}
return result;
}
/** {@inheritDoc} */
@Override
public boolean containsKey(K key) {
return map.containsKey(key);
}
/** {@inheritDoc} */
@Override
public void loadAll(
Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionListener) {
if (cacheLoader != null) {
try {
synchronized (map) {
Map<K, V> loadedValues = cacheLoader.loadAll(keys);
for (Map.Entry<K, V> entry : loadedValues.entrySet()) {
V value = entry.getValue();
K key = entry.getKey();
if (value != null) {
boolean existsCurrently = map.containsKey(key);
if (!existsCurrently || replaceExistingValues) {
map.put(key, value);
keys.remove(key);
}
}
}
}
} catch (Exception e) {
if (completionListener != null) {
completionListener.onException(e);
}
}
}
if (completionListener != null) {
if (keys.isEmpty()) {
completionListener.onCompletion();
}
}
}
/** {@inheritDoc} */
@Override
public void put(K key, V value) {
map.put(key, value);
}
/** {@inheritDoc} */
@Override
public V getAndPut(K key, V value) {
V result = null;
synchronized (map) {
result = map.get(key);
if (result == null && isReadThrough && cacheLoader != null) {
V loadedValue = cacheLoader.load(key);
if (loadedValue != null) {
result = loadedValue;
}
}
map.put(key, value);
}
return result;
}
/** {@inheritDoc} */
@Override
public void putAll(Map<? extends K, ? extends V> map) {
synchronized (map) {
for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
this.map.put(entry.getKey(), entry.getValue());
}
}
}
/** {@inheritDoc} */
@Override
public boolean putIfAbsent(K key, V value) {
synchronized (map) {
if (!map.containsKey(key)) {
map.put(key, value);
return true;
} else {
return false;
}
}
}
/** {@inheritDoc} */
@Override
public boolean remove(K key) {
boolean removed = false;
synchronized (map) {
removed = map.remove(key) != null;
notifyRemovedListeners(key);
}
return removed;
}
/** {@inheritDoc} */
@Override
public boolean remove(K key, V oldValue) {
synchronized (map) {
V value = map.get(key);
if (value != null && oldValue.equals(value)) {
map.remove(key);
notifyRemovedListeners(key);
return true;
}
}
return false;
}
/** {@inheritDoc} */
@Override
public V getAndRemove(K key) {
synchronized (map) {
V oldValue = null;
oldValue = map.get(key);
map.remove(key);
notifyRemovedListeners(key);
return oldValue;
}
}
/** {@inheritDoc} */
@Override
public boolean replace(K key, V oldValue, V newValue) {
synchronized (map) {
V value = map.get(key);
if (value != null && oldValue.equals(value)) {
map.put(key, newValue);
return true;
}
}
return false;
}
/** {@inheritDoc} */
@Override
public boolean replace(K key, V value) {
synchronized (map) {
if (map.containsKey(key)) {
map.put(key, value);
return true;
}
}
return false;
}
/** {@inheritDoc} */
@Override
public V getAndReplace(K key, V value) {
synchronized (map) {
V oldValue = map.get(key);
if (value != null && value.equals(oldValue)) {
map.put(key, value);
return oldValue;
}
}
return null;
}
/** {@inheritDoc} */
@Override
public void removeAll(Set<? extends K> keys) {
synchronized (map) {
Iterator<? extends K> it = keys.iterator();
while (it.hasNext()) {
K key = it.next();
if (map.containsKey(key)) {
map.remove(key);
} else {
it.remove();
}
}
}
notifyRemovedListeners(keys);
}
/** {@inheritDoc} */
@Override
public void removeAll() {
synchronized (map) {
Set<K> keys = map.keySet();
map.clear();
notifyRemovedListeners(keys);
}
}
/** {@inheritDoc} */
@Override
public void clear() {
map.clear();
}
/** {@inheritDoc} */
@Override
public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
if (!MapConfiguration.class.isAssignableFrom(clazz)) {
throw new IllegalArgumentException();
}
return null;
}
/** {@inheritDoc} */
@Override
public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments)
throws EntryProcessorException {
// TODO
return null;
}
/** {@inheritDoc} */
@Override
public <T> Map<K, EntryProcessorResult<T>> invokeAll(
Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... arguments) {
synchronized (map) {
for (K key : keys) {
V value = map.get(key);
if (value != null) {
entryProcessor.process(
new MutableEntry<K, V>() {
@Override
public boolean exists() {
return map.containsKey(key);
}
@Override
public void remove() {
synchronized (map) {
V value = map.get(key);
if (value != null) {
map.remove(key);
notifyRemovedListeners(key);
}
}
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return map.get(value);
}
@Override
public <T> T unwrap(Class<T> clazz) {
return null;
}
@Override
public void setValue(V value) {
map.put(key, value);
}
},
arguments);
}
}
}
return null;
}
/** {@inheritDoc} */
@Override
public String getName() {
return name;
}
/** {@inheritDoc} */
@Override
public CacheManager getCacheManager() {
return manager;
}
/** {@inheritDoc} */
@Override
public void close() {}
/** {@inheritDoc} */
@Override
public boolean isClosed() {
return false;
}
/** {@inheritDoc} */
@Override
@SuppressWarnings("unchecked")
public <T> T unwrap(Class<T> clazz) {
if (Map.class.isAssignableFrom(clazz)) {
return (T) map;
}
return null;
}
/** {@inheritDoc} */
@Override
public void registerCacheEntryListener(
CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
//cacheEntryRemovedListeners.add(cacheEntryListenerConfiguration.getCacheEntryListenerFactory().create());
}
/** {@inheritDoc} */
@Override
public void deregisterCacheEntryListener(
CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {}
/** {@inheritDoc} */
@Override
public Iterator<Entry<K, V>> iterator() {
synchronized (map) {
return new Iterator<Entry<K, V>>() {
Iterator<Map.Entry<K, V>> entries = map.entrySet().iterator();
@Override
public boolean hasNext() {
return entries.hasNext();
}
@Override
public Entry<K, V> next() {
Map.Entry<K, V> entry = entries.next();
return new Entry<K, V>() {
K key = entry.getKey();
V value = entry.getValue();
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return value;
}
@Override
public <T> T unwrap(Class<T> clazz) {
return null;
}
};
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
private void notifyRemovedListeners(K key) {
// if (cacheEntryRemovedListeners != null) {
// cacheEntryRemovedListeners.forEach(listener -> listener.onRemoved())
// }
}
private void notifyRemovedListeners(Set<? extends K> keys) {}
}

View file

@ -1,60 +0,0 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface SessionCache<K, V> {
static final Logger LOG = LoggerFactory.getLogger(SessionCache.class);
static <K, V> SessionCache<K, V> defaultCache() {
GuavaCache<K, V> cache;
RemovalListener<K, V> listener =
new RemovalListener<K, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> n) {
if (n.wasEvicted()) {
String cause = n.getCause().name();
LOG.info(cause);
}
}
};
cache =
new GuavaCache<K, V>(
CacheBuilder.newBuilder()
.maximumSize(25_000)
.expireAfterAccess(5, TimeUnit.MINUTES)
.softValues()
.removalListener(listener)
.build());
return cache;
}
void invalidate(K key);
V get(K key);
void put(K key, V value);
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,6 +20,7 @@ import java.util.*;
import net.helenus.core.*;
import net.helenus.core.cache.Facet;
import net.helenus.core.cache.UnboundFacet;
import net.helenus.core.reflect.HelenusPropertyNode;
import net.helenus.mapping.HelenusProperty;
public abstract class AbstractFilterOperation<E, O extends AbstractFilterOperation<E, O>>
@ -108,6 +110,28 @@ public abstract class AbstractFilterOperation<E, O extends AbstractFilterOperati
ifFilters.add(filter);
}
@Override
protected boolean isIdempotentOperation() {
if (filters == null) {
return super.isIdempotentOperation();
}
return filters
.stream()
.anyMatch(
filter -> {
HelenusPropertyNode node = filter.getNode();
if (node != null) {
HelenusProperty prop = node.getProperty();
if (prop != null) {
return prop.isIdempotent();
}
}
return false;
})
|| super.isIdempotentOperation();
}
protected List<Facet> bindFacetValues(List<Facet> facets) {
if (facets == null) {
return new ArrayList<Facet>();

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -42,7 +43,7 @@ public abstract class AbstractFilterStreamOperation<
public <V> O where(Getter<V> getter, Operator operator, V val) {
addFilter(Filter.create(getter, operator, val));
if (val != null) addFilter(Filter.create(getter, operator, val));
return (O) this;
}
@ -63,7 +64,7 @@ public abstract class AbstractFilterStreamOperation<
public <V> O and(Getter<V> getter, Operator operator, V val) {
addFilter(Filter.create(getter, operator, val));
if (val != null) addFilter(Filter.create(getter, operator, val));
return (O) this;
}
@ -84,7 +85,7 @@ public abstract class AbstractFilterStreamOperation<
public <V> O onlyIf(Getter<V> getter, Operator operator, V val) {
addIfFilter(Filter.create(getter, operator, val));
if (val != null) addIfFilter(Filter.create(getter, operator, val));
return (O) this;
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,8 +21,6 @@ import com.datastax.driver.core.ResultSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import com.datastax.driver.core.Statement;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
@ -41,8 +40,9 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
public E sync() throws TimeoutException {
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = this.execute(sessionOps,null, traceContext, queryExecutionTimeout, queryTimeoutUnits,
showValues,false);
ResultSet resultSet =
this.execute(
sessionOps, null, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
return transform(resultSet);
} finally {
context.stop();
@ -54,11 +54,8 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = execute(sessionOps, uow, traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
true);
ResultSet resultSet =
execute(sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, true);
E result = transform(resultSet);
return result;
} finally {
@ -79,13 +76,16 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
public CompletableFuture<E> async(UnitOfWork uow) {
if (uow == null) return async();
return CompletableFuture.<E>supplyAsync(
() -> {
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
CompletableFuture<E> f =
CompletableFuture.<E>supplyAsync(
() -> {
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
uow.addFuture(f);
return f;
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,16 +25,20 @@ import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.Helenus;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.mapping.MappingUtil;
import net.helenus.support.Fun;
import org.apache.commons.lang3.SerializationUtils;
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
extends AbstractStatementOperation<E, O> {
@ -69,16 +74,22 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
if (updateCache) {
List<Facet> facets = bindFacetValues();
String tableName = CacheUtil.schemaName(facets);
cacheResult = (E) sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
result = Optional.of(cacheResult);
updateCache = false;
sessionCacheHits.mark();
cacheHits.mark();
if (facets != null && facets.size() > 0) {
if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) {
String tableName = CacheUtil.schemaName(facets);
cacheResult = (E) sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
result = Optional.of(cacheResult);
updateCache = false;
sessionCacheHits.mark();
cacheHits.mark();
} else {
sessionCacheMiss.mark();
cacheMiss.mark();
}
}
} else {
sessionCacheMiss.mark();
cacheMiss.mark();
//TODO(gburd): look in statement cache for results
}
}
@ -88,7 +99,6 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
this.execute(
sessionOps,
null,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
@ -100,7 +110,9 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
if (updateCache && result.isPresent()) {
E r = result.get();
if (!(r instanceof Fun)) {
Class<?> resultClass = r.getClass();
if (!(resultClass.getEnclosingClass() != null
&& resultClass.getEnclosingClass() == Fun.class)) {
List<Facet> facets = getFacets();
if (facets != null && facets.size() > 1) {
sessionOps.updateCache(r, facets);
@ -113,7 +125,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
}
}
public Optional<E> sync(UnitOfWork<?> uow) throws TimeoutException {
public Optional<E> sync(UnitOfWork uow) throws TimeoutException {
if (uow == null) return sync();
final Timer.Context context = requestLatency.time();
@ -127,31 +139,50 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
Stopwatch timer = Stopwatch.createStarted();
try {
List<Facet> facets = bindFacetValues();
if (facets != null) {
cachedResult = checkCache(uow, facets);
if (cachedResult != null) {
updateCache = false;
result = Optional.of(cachedResult);
uowCacheHits.mark();
cacheHits.mark();
if (facets != null && facets.size() > 0) {
if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) {
cachedResult = checkCache(uow, facets);
if (cachedResult != null) {
updateCache = false;
result = Optional.of(cachedResult);
uowCacheHits.mark();
cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0);
} else {
updateCache = true;
uowCacheMiss.mark();
if (isSessionCacheable()) {
String tableName = CacheUtil.schemaName(facets);
cachedResult = (E) sessionOps.checkCache(tableName, facets);
if (cachedResult != null) {
result = Optional.of(cachedResult);
sessionCacheHits.mark();
cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0);
} else {
uowCacheMiss.mark();
if (isSessionCacheable()) {
String tableName = CacheUtil.schemaName(facets);
cachedResult = (E) sessionOps.checkCache(tableName, facets);
if (cachedResult != null) {
Class<?> iface = MappingUtil.getMappingInterface(cachedResult);
if (Helenus.entity(iface).isDraftable()) {
result = Optional.of(cachedResult);
} else {
result =
Optional.of(
(E)
SerializationUtils.<Serializable>clone(
(Serializable) cachedResult));
}
updateCache = false;
sessionCacheHits.mark();
cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0);
} else {
updateCache = true;
sessionCacheMiss.mark();
cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0);
}
} else {
sessionCacheMiss.mark();
cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0);
updateCache = false;
}
}
} else {
//TODO(gburd): look in statement cache for results
updateCache = false; //true;
cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0);
}
} else {
updateCache = false;
@ -175,14 +206,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
// Formulate the query and execute it against the Cassandra cluster.
ResultSet resultSet =
execute(
sessionOps,
uow,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
true);
execute(sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, true);
// Transform the query result set into the desired shape.
result = transform(resultSet);
@ -214,15 +238,18 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
});
}
public CompletableFuture<Optional<E>> async(UnitOfWork<?> uow) {
public CompletableFuture<Optional<E>> async(UnitOfWork uow) {
if (uow == null) return async();
return CompletableFuture.<Optional<E>>supplyAsync(
() -> {
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
CompletableFuture<Optional<E>> f =
CompletableFuture.<Optional<E>>supplyAsync(
() -> {
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
uow.addFuture(f);
return f;
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,8 +16,6 @@
*/
package net.helenus.core.operation;
import brave.Tracer;
import brave.propagation.TraceContext;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement;
@ -47,10 +46,10 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
private ConsistencyLevel consistencyLevel;
private ConsistencyLevel serialConsistencyLevel;
private RetryPolicy retryPolicy;
private boolean idempotent = false;
private boolean enableTracing = false;
private long[] defaultTimestamp = null;
private int[] fetchSize = null;
protected boolean idempotent = false;
public AbstractStatementOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
@ -247,22 +246,16 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
statement.setFetchSize(fetchSize[0]);
}
if (idempotent) {
if (isIdempotentOperation()) {
statement.setIdempotent(true);
}
return statement;
}
public O zipkinContext(TraceContext traceContext) {
if (traceContext != null) {
Tracer tracer = this.sessionOps.getZipkinTracer();
if (tracer != null) {
this.traceContext = traceContext;
}
}
return (O) this;
@Override
protected boolean isIdempotentOperation() {
return idempotent;
}
public Statement statement() {
@ -312,7 +305,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
return ignoreCache;
}
protected E checkCache(UnitOfWork<?> uow, List<Facet> facets) {
protected E checkCache(UnitOfWork uow, List<Facet> facets) {
E result = null;
Optional<Object> optionalCachedResult = Optional.empty();
@ -326,7 +319,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
return result;
}
protected void cacheUpdate(UnitOfWork<?> uow, E pojo, List<Facet> identifyingFacets) {
protected Object cacheUpdate(UnitOfWork uow, E pojo, List<Facet> identifyingFacets) {
List<Facet> facets = new ArrayList<>();
Map<String, Object> valueMap =
pojo instanceof MapExportable ? ((MapExportable) pojo).toMap() : null;
@ -358,6 +351,6 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
}
// Cache the value (pojo), the statement key, and the fully bound facets.
uow.cacheUpdate(pojo, facets);
return uow.cacheUpdate(pojo, facets);
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,6 +25,7 @@ import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@ -31,10 +33,13 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.Helenus;
import net.helenus.core.UnitOfWork;
import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.mapping.MappingUtil;
import net.helenus.support.Fun;
import org.apache.commons.lang3.SerializationUtils;
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
extends AbstractStatementOperation<E, O> {
@ -70,16 +75,22 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
if (!ignoreCache() && isSessionCacheable()) {
List<Facet> facets = bindFacetValues();
String tableName = CacheUtil.schemaName(facets);
cacheResult = (E) sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
resultStream = Stream.of(cacheResult);
updateCache = false;
sessionCacheHits.mark();
cacheHits.mark();
} else {
sessionCacheMiss.mark();
cacheMiss.mark();
if (facets != null && facets.size() > 0) {
if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) {
String tableName = CacheUtil.schemaName(facets);
cacheResult = (E) sessionOps.checkCache(tableName, facets);
if (cacheResult != null) {
resultStream = Stream.of(cacheResult);
updateCache = false;
sessionCacheHits.mark();
cacheHits.mark();
} else {
sessionCacheMiss.mark();
cacheMiss.mark();
}
} else {
//TODO(gburd): look in statement cache for results
}
}
}
@ -89,7 +100,6 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
this.execute(
sessionOps,
null,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
@ -105,8 +115,10 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
List<E> again = new ArrayList<>();
resultStream.forEach(
result -> {
if (!(result instanceof Fun)) {
sessionOps.updateCache(result, facets);
Class<?> resultClass = result.getClass();
if (!(resultClass.getEnclosingClass() != null
&& resultClass.getEnclosingClass() == Fun.class)) {
sessionOps.updateCache(result, facets);
}
again.add(result);
});
@ -133,31 +145,49 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
Stopwatch timer = Stopwatch.createStarted();
try {
List<Facet> facets = bindFacetValues();
if (facets != null) {
cachedResult = checkCache(uow, facets);
if (cachedResult != null) {
updateCache = false;
resultStream = Stream.of(cachedResult);
uowCacheHits.mark();
cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0);
} else {
updateCache = true;
uowCacheMiss.mark();
if (isSessionCacheable()) {
String tableName = CacheUtil.schemaName(facets);
cachedResult = (E) sessionOps.checkCache(tableName, facets);
if (cachedResult != null) {
resultStream = Stream.of(cachedResult);
sessionCacheHits.mark();
cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0);
if (facets != null && facets.size() > 0) {
if (facets.stream().filter(f -> !f.fixed()).distinct().count() > 0) {
cachedResult = checkCache(uow, facets);
if (cachedResult != null) {
updateCache = false;
resultStream = Stream.of(cachedResult);
uowCacheHits.mark();
cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0);
} else {
uowCacheMiss.mark();
if (isSessionCacheable()) {
String tableName = CacheUtil.schemaName(facets);
cachedResult = (E) sessionOps.checkCache(tableName, facets);
if (cachedResult != null) {
Class<?> iface = MappingUtil.getMappingInterface(cachedResult);
E result = null;
if (Helenus.entity(iface).isDraftable()) {
result = cachedResult;
} else {
result =
(E) SerializationUtils.<Serializable>clone((Serializable) cachedResult);
}
updateCache = false;
resultStream = Stream.of(result);
sessionCacheHits.mark();
cacheHits.mark();
uow.recordCacheAndDatabaseOperationCount(1, 0);
} else {
updateCache = true;
sessionCacheMiss.mark();
cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0);
}
} else {
sessionCacheMiss.mark();
cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0);
updateCache = false;
}
}
} else {
//TODO(gburd): look in statement cache for results
updateCache = false; //true;
cacheMiss.mark();
uow.recordCacheAndDatabaseOperationCount(-1, 0);
}
} else {
updateCache = false;
@ -173,14 +203,7 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
// Check to see if we fetched the object from the cache
if (resultStream == null) {
ResultSet resultSet =
execute(
sessionOps,
uow,
traceContext,
queryExecutionTimeout,
queryTimeoutUnits,
showValues,
true);
execute(sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, true);
resultStream = transform(resultSet);
}
@ -191,12 +214,15 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
List<E> again = new ArrayList<>();
List<Facet> facets = getFacets();
resultStream.forEach(
result -> {
if (result != deleted && !(result instanceof Fun)) {
cacheUpdate(uow, result, facets);
}
again.add(result);
});
result -> {
Class<?> resultClass = result.getClass();
if (result != deleted
&& !(resultClass.getEnclosingClass() != null
&& resultClass.getEnclosingClass() == Fun.class)) {
result = (E) cacheUpdate(uow, result, facets);
}
again.add(result);
});
resultStream = again.stream();
}
}
@ -220,13 +246,16 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public CompletableFuture<Stream<E>> async(UnitOfWork uow) {
if (uow == null) return async();
return CompletableFuture.<Stream<E>>supplyAsync(
() -> {
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
CompletableFuture<Stream<E>> f =
CompletableFuture.<Stream<E>>supplyAsync(
() -> {
try {
return sync();
} catch (TimeoutException ex) {
throw new CompletionException(ex);
}
});
uow.addFuture(f);
return f;
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,100 +17,124 @@
package net.helenus.core.operation;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.AtomicMonotonicTimestampGenerator;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.TimestampGenerator;
import com.google.common.base.Stopwatch;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.support.HelenusException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
import net.helenus.support.HelenusException;
public class BatchOperation extends Operation<Long> {
private BatchStatement batch = null;
private List<AbstractOperation<?, ?>> operations = new ArrayList<AbstractOperation<?, ?>>();
private boolean logged = true;
private long timestamp = 0L;
//TODO(gburd): find the way to get the driver's timestamp generator
private static final TimestampGenerator timestampGenerator =
new AtomicMonotonicTimestampGenerator();
public BatchOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
}
private final BatchStatement batch;
private List<AbstractOperation<?, ?>> operations = new ArrayList<AbstractOperation<?, ?>>();
private boolean logged = true;
public void add(AbstractOperation<?, ?> operation) {
operations.add(operation);
}
public BatchOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
batch = new BatchStatement();
}
@Override
public BatchStatement buildStatement(boolean cached) {
batch = new BatchStatement();
batch.addAll(operations.stream().map(o -> o.buildStatement(cached)).collect(Collectors.toList()));
batch.setConsistencyLevel(sessionOps.getDefaultConsistencyLevel());
timestamp = System.nanoTime();
batch.setDefaultTimestamp(timestamp);
return batch;
}
public void add(AbstractOperation<?, ?> operation) {
operations.add(operation);
}
public BatchOperation logged() {
logged = true;
return this;
}
@Override
public BatchStatement buildStatement(boolean cached) {
batch.addAll(
operations.stream().map(o -> o.buildStatement(cached)).collect(Collectors.toList()));
batch.setConsistencyLevel(sessionOps.getDefaultConsistencyLevel());
return batch;
}
public BatchOperation setLogged(boolean logStatements) {
logged = logStatements;
return this;
}
public BatchOperation logged() {
logged = true;
return this;
}
public Long sync() throws TimeoutException {
if (operations.size() == 0) return 0L;
final Timer.Context context = requestLatency.time();
try {
timestamp = System.nanoTime();
batch.setDefaultTimestamp(timestamp);
ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch.");
}
} finally {
context.stop();
public BatchOperation setLogged(boolean logStatements) {
logged = logStatements;
return this;
}
public Long sync() {
if (operations.size() == 0) return 0L;
final Timer.Context context = requestLatency.time();
try {
batch.setDefaultTimestamp(timestampGenerator.next());
ResultSet resultSet =
this.execute(
sessionOps, null, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch.");
}
return timestamp;
} catch (TimeoutException e) {
throw new HelenusException(e);
} finally {
context.stop();
}
return batch.getDefaultTimestamp();
}
public Long sync(UnitOfWork<?> uow) throws TimeoutException {
if (operations.size() == 0) return 0L;
if (uow == null)
return sync();
public Long sync(UnitOfWork uow) {
if (operations.size() == 0) return 0L;
if (uow == null) return sync();
final Timer.Context context = requestLatency.time();
final Stopwatch timer = Stopwatch.createStarted();
try {
uow.recordCacheAndDatabaseOperationCount(0, 1);
ResultSet resultSet = this.execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch.");
}
} finally {
context.stop();
timer.stop();
final Timer.Context context = requestLatency.time();
final Stopwatch timer = Stopwatch.createStarted();
try {
uow.recordCacheAndDatabaseOperationCount(0, 1);
batch.setDefaultTimestamp(timestampGenerator.next());
ResultSet resultSet =
this.execute(
sessionOps, uow, queryExecutionTimeout, queryTimeoutUnits, showValues, false);
if (!resultSet.wasApplied()) {
throw new HelenusException("Failed to apply batch.");
}
uow.addDatabaseTime("Cassandra", timer);
return timestamp;
} catch (TimeoutException e) {
throw new HelenusException(e);
} finally {
context.stop();
timer.stop();
}
uow.addDatabaseTime("Cassandra", timer);
return batch.getDefaultTimestamp();
}
public void addAll(BatchOperation batch) {
batch.operations.forEach(o -> this.operations.add(o));
}
public void addAll(BatchOperation batch) {
batch.operations.forEach(o -> this.operations.add(o));
}
public String toString() {
StringBuilder s = new StringBuilder();
s.append("BEGIN ");
if (!logged) { s.append("UN"); }
s.append("LOGGED BATCH; ");
s.append(operations.stream().map(o -> Operation.queryString(o.buildStatement(false), showValues)).collect(Collectors.joining(" ")));
s.append(" APPLY BATCH;");
return s.toString();
public String toString() {
return toString(true); //TODO(gburd): sessionOps.showQueryValues()
}
public String toString(boolean showValues) {
StringBuilder s = new StringBuilder();
s.append("BEGIN ");
if (!logged) {
s.append("UNLOGGED ");
}
s.append("BATCH ");
if (batch.getDefaultTimestamp() > -9223372036854775808L) {
s.append("USING TIMESTAMP ").append(String.valueOf(batch.getDefaultTimestamp())).append(" ");
}
s.append(
operations
.stream()
.map(o -> Operation.queryString(o.buildStatement(showValues), showValues))
.collect(Collectors.joining(" ")));
s.append(" APPLY BATCH;");
return s.toString();
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -134,6 +135,10 @@ public final class DeleteOperation extends AbstractFilterOperation<ResultSet, De
return bindFacetValues(getFacets());
}
protected boolean isIdempotentOperation() {
return true;
}
@Override
public ResultSet sync() throws TimeoutException {
ResultSet result = super.sync();

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,6 +20,10 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.Getter;
import net.helenus.core.Helenus;
@ -27,7 +32,6 @@ import net.helenus.core.cache.CacheUtil;
import net.helenus.core.cache.Facet;
import net.helenus.core.cache.UnboundFacet;
import net.helenus.core.reflect.DefaultPrimitiveTypes;
import net.helenus.core.reflect.Drafted;
import net.helenus.core.reflect.HelenusPropertyNode;
import net.helenus.core.reflect.MapExportable;
import net.helenus.mapping.HelenusEntity;
@ -38,16 +42,13 @@ import net.helenus.support.Fun;
import net.helenus.support.HelenusException;
import net.helenus.support.HelenusMappingException;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
public final class InsertOperation<T> extends AbstractOperation<T, InsertOperation<T>> {
private final List<Fun.Tuple2<HelenusPropertyNode, Object>> values = new ArrayList<Fun.Tuple2<HelenusPropertyNode, Object>>();
private final List<Fun.Tuple2<HelenusPropertyNode, Object>> values =
new ArrayList<Fun.Tuple2<HelenusPropertyNode, Object>>();
private final T pojo;
private final Class<?> resultType;
private final Set<String> readSet;
private HelenusEntity entity;
private boolean ifNotExists;
@ -58,34 +59,48 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
public InsertOperation(AbstractSessionOperations sessionOperations, boolean ifNotExists) {
super(sessionOperations);
this.ifNotExists = ifNotExists;
this.pojo = null;
this.readSet = null;
this.ifNotExists = ifNotExists;
this.resultType = ResultSet.class;
}
public InsertOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity, Class<?> resultType, boolean ifNotExists) {
public InsertOperation(
AbstractSessionOperations sessionOperations,
HelenusEntity entity,
Class<?> resultType,
boolean ifNotExists) {
super(sessionOperations);
this.ifNotExists = ifNotExists;
this.pojo = null;
this.readSet = null;
this.ifNotExists = ifNotExists;
this.resultType = resultType;
this.entity = entity;
}
public InsertOperation(AbstractSessionOperations sessionOperations, Class<?> resultType, boolean ifNotExists) {
public InsertOperation(
AbstractSessionOperations sessionOperations, Class<?> resultType, boolean ifNotExists) {
super(sessionOperations);
this.ifNotExists = ifNotExists;
this.pojo = null;
this.readSet = null;
this.ifNotExists = ifNotExists;
this.resultType = resultType;
}
public InsertOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity, T pojo,
Set<String> mutations, boolean ifNotExists) {
public InsertOperation(
AbstractSessionOperations sessionOperations,
HelenusEntity entity,
T pojo,
Set<String> mutations,
Set<String> read,
boolean ifNotExists) {
super(sessionOperations);
this.entity = entity;
this.pojo = pojo;
this.readSet = read;
this.entity = entity;
this.ifNotExists = ifNotExists;
this.resultType = entity.getMappingInterface();
@ -144,16 +159,28 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
@Override
public BuiltStatement buildStatement(boolean cached) {
List<HelenusEntity> entities = values.stream().map(t -> t._1.getProperty().getEntity()).distinct().collect(Collectors.toList());
List<HelenusEntity> entities =
values
.stream()
.map(t -> t._1.getProperty().getEntity())
.distinct()
.collect(Collectors.toList());
if (entities.size() != 1) {
throw new HelenusMappingException("you can insert only single entity at a time, found: "
+ entities.stream().map(e -> e.getMappingInterface().toString()).collect(Collectors.joining(", ")));
throw new HelenusMappingException(
"you can insert only single entity at a time, found: "
+ entities
.stream()
.map(e -> e.getMappingInterface().toString())
.collect(Collectors.joining(", ")));
}
HelenusEntity entity = entities.get(0);
if (this.entity != null) {
if (this.entity != entity) {
throw new HelenusMappingException("you can insert only single entity at a time, found: " +
this.entity.getMappingInterface().toString() + ", " + entity.getMappingInterface().toString());
throw new HelenusMappingException(
"you can insert only single entity at a time, found: "
+ this.entity.getMappingInterface().toString()
+ ", "
+ entity.getMappingInterface().toString());
}
} else {
this.entity = entity;
@ -188,52 +215,53 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
return insert;
}
private T newInstance(Class<?> iface) {
if (values.size() > 0) {
boolean immutable = iface.isAssignableFrom(Drafted.class);
Collection<HelenusProperty> properties = entity.getOrderedProperties();
Map<String, Object> backingMap = new HashMap<String, Object>(properties.size());
private T newInstance(Class<?> iface) {
if (values.size() > 0) {
boolean immutable = entity.isDraftable();
Collection<HelenusProperty> properties = entity.getOrderedProperties();
Map<String, Object> backingMap = new HashMap<String, Object>(properties.size());
// First, add all the inserted values into our new map.
values.forEach(t -> backingMap.put(t._1.getProperty().getPropertyName(), t._2));
// First, add all the inserted values into our new map.
values.forEach(t -> backingMap.put(t._1.getProperty().getPropertyName(), t._2));
// Then, fill in all the rest of the properties.
for (HelenusProperty prop : properties) {
String key = prop.getPropertyName();
if (backingMap.containsKey(key)) {
// Some values man need to be converted (e.g. from String to Enum). This is done
// within the BeanColumnValueProvider below.
Optional<Function<Object, Object>> converter = prop.getReadConverter(
sessionOps.getSessionRepository());
if (converter.isPresent()) {
backingMap.put(key, converter.get().apply(backingMap.get(key)));
}
} else {
// If we started this operation with an instance of this type, use values from
// that.
if (pojo != null) {
backingMap.put(key, BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, immutable));
} else {
// Otherwise we'll use default values for the property type if available.
Class<?> propType = prop.getJavaType();
if (propType.isPrimitive()) {
DefaultPrimitiveTypes type = DefaultPrimitiveTypes.lookup(propType);
if (type == null) {
throw new HelenusException("unknown primitive type " + propType);
}
backingMap.put(key, type.getDefaultValue());
}
}
}
// Then, fill in all the rest of the properties.
for (HelenusProperty prop : properties) {
String key = prop.getPropertyName();
if (backingMap.containsKey(key)) {
// Some values man need to be converted (e.g. from String to Enum). This is done
// within the BeanColumnValueProvider below.
Optional<Function<Object, Object>> converter =
prop.getReadConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) {
backingMap.put(key, converter.get().apply(backingMap.get(key)));
}
} else {
// If we started this operation with an instance of this type, use values from
// that.
if (pojo != null) {
backingMap.put(
key, BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop, immutable));
} else {
// Otherwise we'll use default values for the property type if available.
Class<?> propType = prop.getJavaType();
if (propType.isPrimitive()) {
DefaultPrimitiveTypes type = DefaultPrimitiveTypes.lookup(propType);
if (type == null) {
throw new HelenusException("unknown primitive type " + propType);
}
backingMap.put(key, type.getDefaultValue());
}
// Lastly, create a new proxy object for the entity and return the new instance.
return (T) Helenus.map(iface, backingMap);
}
}
return null;
}
}
@Override
// Lastly, create a new proxy object for the entity and return the new instance.
return (T) Helenus.map(iface, backingMap);
}
return null;
}
@Override
public T transform(ResultSet resultSet) {
if ((ifNotExists == true) && (resultSet.wasApplied() == false)) {
throw new HelenusException("Statement was not applied due to consistency constraints");
@ -241,12 +269,12 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
Class<?> iface = entity.getMappingInterface();
if (resultType == iface) {
T o = newInstance(iface);
if (o == null) {
// Oddly, this insert didn't change anything so simply return the pojo.
return (T) pojo;
}
return o;
T o = newInstance(iface);
if (o == null) {
// Oddly, this insert didn't change anything so simply return the pojo.
return (T) pojo;
}
return o;
}
return (T) resultSet;
}
@ -265,36 +293,45 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
protected void adjustTtlAndWriteTime(MapExportable pojo) {
if (ttl != null || writeTime != 0L) {
List<String> propertyNames = values.stream()
List<String> columnNames =
values
.stream()
.map(t -> t._1.getProperty())
.filter(prop -> {
switch (prop.getColumnType()) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
return false;
default:
return true;
}
})
.filter(
prop -> {
switch (prop.getColumnType()) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
return false;
default:
return true;
}
})
.map(prop -> prop.getColumnName().toCql(false))
.collect(Collectors.toList());
if (propertyNames.size() > 0) {
if (columnNames.size() > 0) {
if (ttl != null) {
propertyNames.forEach(name -> pojo.put(CacheUtil.ttlKey(name), ttl));
columnNames.forEach(name -> pojo.put(CacheUtil.ttlKey(name), ttl));
}
if (writeTime != 0L) {
propertyNames.forEach(name -> pojo.put(CacheUtil.writeTimeKey(name), writeTime));
columnNames.forEach(name -> pojo.put(CacheUtil.writeTimeKey(name), writeTime));
}
}
}
}
@Override
protected boolean isIdempotentOperation() {
return values.stream().map(v -> v._1.getProperty()).allMatch(prop -> prop.isIdempotent())
|| super.isIdempotentOperation();
}
@Override
public T sync() throws TimeoutException {
T result = super.sync();
if (entity.isCacheable() && result != null) {
adjustTtlAndWriteTime((MapExportable)result);
adjustTtlAndWriteTime((MapExportable) result);
sessionOps.updateCache(result, bindFacetValues());
}
return result;
@ -321,10 +358,6 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
adjustTtlAndWriteTime((MapExportable) result);
}
cacheUpdate(uow, result, bindFacetValues());
} else {
if (entity.isCacheable()) {
sessionOps.cacheEvict(bindFacetValues());
}
}
return result;
}
@ -339,8 +372,8 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
if (resultType == iface) {
final T result = (pojo == null) ? newInstance(iface) : pojo;
if (result != null) {
adjustTtlAndWriteTime((MapExportable) result);
cacheUpdate(uow, result, bindFacetValues());
adjustTtlAndWriteTime((MapExportable) result);
cacheUpdate(uow, result, bindFacetValues());
}
uow.batch(this);
return (T) result;

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,9 +16,6 @@
*/
package net.helenus.core.operation;
import brave.Span;
import brave.Tracer;
import brave.propagation.TraceContext;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
@ -42,8 +40,7 @@ public abstract class Operation<E> {
private static final Logger LOG = LoggerFactory.getLogger(Operation.class);
protected final AbstractSessionOperations sessionOps;
protected boolean showValues = false;
protected TraceContext traceContext;
protected boolean showValues;
protected long queryExecutionTimeout = 10;
protected TimeUnit queryTimeoutUnits = TimeUnit.SECONDS;
protected final Meter uowCacheHits;
@ -56,6 +53,7 @@ public abstract class Operation<E> {
Operation(AbstractSessionOperations sessionOperations) {
this.sessionOps = sessionOperations;
this.showValues = sessionOps.showValues();
MetricRegistry metrics = sessionOperations.getMetricRegistry();
if (metrics == null) {
metrics = new MetricRegistry();
@ -69,6 +67,10 @@ public abstract class Operation<E> {
this.requestLatency = metrics.timer("net.helenus.request-latency");
}
public static String queryString(BatchOperation operation, boolean includeValues) {
return operation.toString(includeValues);
}
public static String queryString(Statement statement, boolean includeValues) {
String query = null;
if (statement instanceof BuiltStatement) {
@ -91,82 +93,79 @@ public abstract class Operation<E> {
public ResultSet execute(
AbstractSessionOperations session,
UnitOfWork uow,
TraceContext traceContext,
long timeout,
TimeUnit units,
boolean showValues,
boolean cached)
throws TimeoutException {
// Start recording in a Zipkin sub-span our execution time to perform this operation.
Tracer tracer = session.getZipkinTracer();
Span span = null;
if (tracer != null && traceContext != null) {
span = tracer.newChild(traceContext);
Statement statement = options(buildStatement(cached));
if (session.isShowCql()) {
String stmt =
(this instanceof BatchOperation)
? queryString((BatchOperation) this, showValues)
: queryString(statement, showValues);
session.getPrintStream().println(stmt);
} else if (LOG.isDebugEnabled()) {
String stmt =
(this instanceof BatchOperation)
? queryString((BatchOperation) this, showValues)
: queryString(statement, showValues);
LOG.info("CQL> " + stmt);
}
Stopwatch timer = Stopwatch.createStarted();
try {
if (span != null) {
span.name("cassandra");
span.start();
}
Statement statement = options(buildStatement(cached));
Stopwatch timer = Stopwatch.createStarted();
try {
ResultSetFuture futureResultSet = session.executeAsync(statement, uow, timer, showValues);
if (uow != null) uow.recordCacheAndDatabaseOperationCount(0, 1);
ResultSet resultSet = futureResultSet.getUninterruptibly(timeout, units);
ColumnDefinitions columnDefinitions = resultSet.getColumnDefinitions();
if (LOG.isDebugEnabled()) {
ExecutionInfo ei = resultSet.getExecutionInfo();
Host qh = ei.getQueriedHost();
String oh =
ei.getTriedHosts()
.stream()
.map(Host::getAddress)
.map(InetAddress::toString)
.collect(Collectors.joining(", "));
ConsistencyLevel cl = ei.getAchievedConsistencyLevel();
int se = ei.getSpeculativeExecutions();
String warn = ei.getWarnings().stream().collect(Collectors.joining(", "));
String ri =
String.format(
"%s %s %s %s %s %s%sspec-retries: %d",
"server v" + qh.getCassandraVersion(),
qh.getAddress().toString(),
(oh != null && !oh.equals("")) ? " [tried: " + oh + "]" : "",
qh.getDatacenter(),
qh.getRack(),
(cl != null)
? (" consistency: "
+ cl.name()
+ (cl.isDCLocal() ? " DC " : "")
+ (cl.isSerial() ? " SC " : ""))
: "",
(warn != null && !warn.equals("")) ? ": " + warn : "",
se);
if (uow != null) uow.setInfo(ri);
else LOG.debug(ri);
ResultSetFuture futureResultSet = session.executeAsync(statement, uow, timer);
if (uow != null) uow.recordCacheAndDatabaseOperationCount(0, 1);
ResultSet resultSet = futureResultSet.getUninterruptibly(timeout, units);
ColumnDefinitions columnDefinitions = resultSet.getColumnDefinitions();
if (LOG.isDebugEnabled()) {
ExecutionInfo ei = resultSet.getExecutionInfo();
Host qh = ei.getQueriedHost();
String oh =
ei.getTriedHosts()
.stream()
.map(Host::getAddress)
.map(InetAddress::toString)
.collect(Collectors.joining(", "));
ConsistencyLevel cl = ei.getAchievedConsistencyLevel();
if (cl == null) {
cl = statement.getConsistencyLevel();
}
if (!resultSet.wasApplied()
&& !(columnDefinitions.size() > 1 || !columnDefinitions.contains("[applied]"))) {
throw new HelenusException("Operation Failed");
}
return resultSet;
} finally {
timer.stop();
if (uow != null) uow.addDatabaseTime("Cassandra", timer);
log(statement, uow, timer, showValues);
int se = ei.getSpeculativeExecutions();
String warn = ei.getWarnings().stream().collect(Collectors.joining(", "));
String ri =
String.format(
"%s %s ~%s %s %s%s%sspec-retries: %d",
"server v" + qh.getCassandraVersion(),
qh.getAddress().toString(),
(oh != null && !oh.equals("")) ? " [tried: " + oh + "]" : "",
qh.getDatacenter(),
qh.getRack(),
(cl != null)
? (" consistency: "
+ cl.name()
+ " "
+ (cl.isDCLocal() ? " DC " : "")
+ (cl.isSerial() ? " SC " : ""))
: "",
(warn != null && !warn.equals("")) ? ": " + warn : "",
se);
if (uow != null) uow.setInfo(ri);
else LOG.debug(ri);
}
if (!resultSet.wasApplied()
&& !(columnDefinitions.size() > 1 || !columnDefinitions.contains("[applied]"))) {
throw new HelenusException("Operation Failed");
}
return resultSet;
} finally {
if (span != null) {
span.finish();
}
timer.stop();
if (uow != null) uow.addDatabaseTime("Cassandra", timer);
log(statement, uow, timer, showValues);
}
}
@ -181,10 +180,15 @@ public abstract class Operation<E> {
timerString = String.format(" %s ", timer.toString());
}
LOG.info(
String.format("%s%s%s", uowString, timerString, Operation.queryString(statement, showValues)));
String.format(
"%s%s%s", uowString, timerString, Operation.queryString(statement, showValues)));
}
}
protected boolean isIdempotentOperation() {
return false;
}
public Statement options(Statement statement) {
return statement;
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,69 +1,72 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import net.helenus.core.cache.Facet;
public final class SelectFirstOperation<E>
extends AbstractFilterOptionalOperation<E, SelectFirstOperation<E>> {
private final SelectOperation<E> delegate;
public SelectFirstOperation(SelectOperation<E> delegate) {
super(delegate.sessionOps);
this.delegate = delegate;
this.filters = delegate.filters;
this.ifFilters = delegate.ifFilters;
}
public <R> SelectFirstTransformingOperation<R, E> map(Function<E, R> fn) {
return new SelectFirstTransformingOperation<R, E>(delegate, fn);
}
@Override
public BuiltStatement buildStatement(boolean cached) {
return delegate.buildStatement(cached);
}
@Override
public List<Facet> getFacets() {
return delegate.getFacets();
}
@Override
public List<Facet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override
public Optional<E> transform(ResultSet resultSet) {
return delegate.transform(resultSet).findFirst();
}
@Override
public boolean isSessionCacheable() {
return delegate.isSessionCacheable();
}
@Override
public boolean ignoreCache() { return delegate.ignoreCache(); }
}
/*
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import net.helenus.core.cache.Facet;
public final class SelectFirstOperation<E>
extends AbstractFilterOptionalOperation<E, SelectFirstOperation<E>> {
private final SelectOperation<E> delegate;
public SelectFirstOperation(SelectOperation<E> delegate) {
super(delegate.sessionOps);
this.delegate = delegate;
this.filters = delegate.filters;
this.ifFilters = delegate.ifFilters;
}
public <R> SelectFirstTransformingOperation<R, E> map(Function<E, R> fn) {
return new SelectFirstTransformingOperation<R, E>(delegate, fn);
}
@Override
public BuiltStatement buildStatement(boolean cached) {
return delegate.buildStatement(cached);
}
@Override
public List<Facet> getFacets() {
return delegate.getFacets();
}
@Override
public List<Facet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override
public Optional<E> transform(ResultSet resultSet) {
return delegate.transform(resultSet).findFirst();
}
@Override
public boolean isSessionCacheable() {
return delegate.isSessionCacheable();
}
@Override
public boolean ignoreCache() {
return delegate.ignoreCache();
}
}

View file

@ -1,62 +1,65 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import net.helenus.core.cache.Facet;
public final class SelectFirstTransformingOperation<R, E>
extends AbstractFilterOptionalOperation<R, SelectFirstTransformingOperation<R, E>> {
private final SelectOperation<E> delegate;
private final Function<E, R> fn;
public SelectFirstTransformingOperation(SelectOperation<E> delegate, Function<E, R> fn) {
super(delegate.sessionOps);
this.delegate = delegate;
this.fn = fn;
this.filters = delegate.filters;
this.ifFilters = delegate.ifFilters;
}
@Override
public List<Facet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override
public BuiltStatement buildStatement(boolean cached) {
return delegate.buildStatement(cached);
}
@Override
public Optional<R> transform(ResultSet resultSet) {
return delegate.transform(resultSet).findFirst().map(fn);
}
@Override
public boolean isSessionCacheable() {
return delegate.isSessionCacheable();
}
@Override
public boolean ignoreCache() { return delegate.ignoreCache(); }
}
/*
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import net.helenus.core.cache.Facet;
public final class SelectFirstTransformingOperation<R, E>
extends AbstractFilterOptionalOperation<R, SelectFirstTransformingOperation<R, E>> {
private final SelectOperation<E> delegate;
private final Function<E, R> fn;
public SelectFirstTransformingOperation(SelectOperation<E> delegate, Function<E, R> fn) {
super(delegate.sessionOps);
this.delegate = delegate;
this.fn = fn;
this.filters = delegate.filters;
this.ifFilters = delegate.ifFilters;
}
@Override
public List<Facet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override
public BuiltStatement buildStatement(boolean cached) {
return delegate.buildStatement(cached);
}
@Override
public Optional<R> transform(ResultSet resultSet) {
return delegate.transform(resultSet).findFirst().map(fn);
}
@Override
public boolean isSessionCacheable() {
return delegate.isSessionCacheable();
}
@Override
public boolean ignoreCache() {
return delegate.ignoreCache();
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -23,7 +24,6 @@ import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.core.querybuilder.Select.Selection;
import com.datastax.driver.core.querybuilder.Select.Where;
import com.google.common.collect.Iterables;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Stream;
@ -97,7 +97,10 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
this.implementsEntityType = Entity.class.isAssignableFrom(entity.getMappingInterface());
}
public SelectOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity, Function<Row, E> rowMapper) {
public SelectOperation(
AbstractSessionOperations sessionOperations,
HelenusEntity entity,
Function<Row, E> rowMapper) {
super(sessionOperations);
this.rowMapper = rowMapper;
@ -112,8 +115,10 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
this.implementsEntityType = Entity.class.isAssignableFrom(entity.getMappingInterface());
}
public SelectOperation(AbstractSessionOperations sessionOperations, Function<Row, E> rowMapper,
HelenusPropertyNode... props) {
public SelectOperation(
AbstractSessionOperations sessionOperations,
Function<Row, E> rowMapper,
HelenusPropertyNode... props) {
super(sessionOperations);
@ -310,7 +315,9 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
for (Filter<?> filter : filters.values()) {
where.and(filter.getClause(sessionOps.getValuePreparer()));
HelenusProperty filterProp = filter.getNode().getProperty();
HelenusProperty prop = props.stream()
HelenusProperty prop =
props
.stream()
.map(HelenusPropertyNode::getProperty)
.filter(thisProp -> thisProp.getPropertyName().equals(filterProp.getPropertyName()))
.findFirst()

View file

@ -1,65 +1,70 @@
/*
* Copyright (C) 2015 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;
import net.helenus.core.cache.Facet;
public final class SelectTransformingOperation<R, E>
extends AbstractFilterStreamOperation<R, SelectTransformingOperation<R, E>> {
private final SelectOperation<E> delegate;
private final Function<E, R> fn;
public SelectTransformingOperation(SelectOperation<E> delegate, Function<E, R> fn) {
super(delegate.sessionOps);
this.delegate = delegate;
this.fn = fn;
this.filters = delegate.filters;
this.ifFilters = delegate.ifFilters;
}
@Override
public List<Facet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override
public List<Facet> getFacets() {
return delegate.getFacets();
}
@Override
public BuiltStatement buildStatement(boolean cached) {
return delegate.buildStatement(cached);
}
@Override
public Stream<R> transform(ResultSet resultSet) {
return delegate.transform(resultSet).map(fn);
}
@Override
public boolean isSessionCacheable() { return delegate.isSessionCacheable(); }
@Override
public boolean ignoreCache() { return delegate.ignoreCache(); }
}
/*
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;
import net.helenus.core.cache.Facet;
public final class SelectTransformingOperation<R, E>
extends AbstractFilterStreamOperation<R, SelectTransformingOperation<R, E>> {
private final SelectOperation<E> delegate;
private final Function<E, R> fn;
public SelectTransformingOperation(SelectOperation<E> delegate, Function<E, R> fn) {
super(delegate.sessionOps);
this.delegate = delegate;
this.fn = fn;
this.filters = delegate.filters;
this.ifFilters = delegate.ifFilters;
}
@Override
public List<Facet> bindFacetValues() {
return delegate.bindFacetValues();
}
@Override
public List<Facet> getFacets() {
return delegate.getFacets();
}
@Override
public BuiltStatement buildStatement(boolean cached) {
return delegate.buildStatement(cached);
}
@Override
public Stream<R> transform(ResultSet resultSet) {
return delegate.transform(resultSet).map(fn);
}
@Override
public boolean isSessionCacheable() {
return delegate.isSessionCacheable();
}
@Override
public boolean ignoreCache() {
return delegate.ignoreCache();
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -34,19 +35,16 @@ import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.HelenusProperty;
import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.value.BeanColumnValueProvider;
import net.helenus.mapping.value.ValueProviderMap;
import net.helenus.support.HelenusException;
import net.helenus.support.HelenusMappingException;
import net.helenus.support.Immutables;
import static net.helenus.mapping.ColumnType.CLUSTERING_COLUMN;
import static net.helenus.mapping.ColumnType.PARTITION_KEY;
public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateOperation<E>> {
private final Map<Assignment, BoundFacet> assignments = new HashMap<>();
private final AbstractEntityDraft<E> draft;
private final Map<String, Object> draftMap;
private final Set<String> readSet;
private HelenusEntity entity = null;
private Object pojo;
private int[] ttl;
@ -57,6 +55,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
super(sessionOperations);
this.draft = null;
this.draftMap = null;
this.readSet = null;
}
public UpdateOperation(
@ -64,6 +63,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
super(sessionOperations);
this.draft = draft;
this.draftMap = draft.toMap();
this.readSet = draft.read();
}
public UpdateOperation(AbstractSessionOperations sessionOperations, Object pojo) {
@ -75,7 +75,12 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
this.entity = Helenus.resolve(MappingUtil.getMappingInterface(pojo));
if (this.entity != null && entity.isCacheable() && pojo instanceof MapExportable) {
this.pojo = pojo;
this.readSet = ((MapExportable) pojo).toReadSet();
} else {
this.readSet = null;
}
} else {
this.readSet = null;
}
}
@ -84,6 +89,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
super(sessionOperations);
this.draft = null;
this.draftMap = null;
this.readSet = null;
Object value = sessionOps.getValuePreparer().prepareColumnValue(v, p.getProperty());
assignments.put(QueryBuilder.set(p.getColumnName(), value), new BoundFacet(p.getProperty(), v));
@ -110,7 +116,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
if (pojo != null) {
if (!BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop).equals(v)) {
String key = prop.getPropertyName();
((MapExportable)pojo).put(key, v);
((MapExportable) pojo).put(key, v);
}
}
@ -421,7 +427,8 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
Object valueObj = value;
Optional<Function<Object, Object>> converter = prop.getWriteConverter(sessionOps.getSessionRepository());
Optional<Function<Object, Object>> converter =
prop.getWriteConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) {
List convertedList = (List) converter.get().apply(Immutables.listOf(value));
valueObj = convertedList.get(0);
@ -436,7 +443,8 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
List valueObj = value;
Optional<Function<Object, Object>> converter = prop.getWriteConverter(sessionOps.getSessionRepository());
Optional<Function<Object, Object>> converter =
prop.getWriteConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) {
valueObj = (List) converter.get().apply(value);
}
@ -581,7 +589,8 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
HelenusProperty prop = p.getProperty();
Object valueObj = value;
Optional<Function<Object, Object>> converter = prop.getWriteConverter(sessionOps.getSessionRepository());
Optional<Function<Object, Object>> converter =
prop.getWriteConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) {
Set convertedSet = (Set) converter.get().apply(Immutables.setOf(value));
valueObj = convertedSet.iterator().next();
@ -595,7 +604,8 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
HelenusProperty prop = p.getProperty();
Set valueObj = value;
Optional<Function<Object, Object>> converter = prop.getWriteConverter(sessionOps.getSessionRepository());
Optional<Function<Object, Object>> converter =
prop.getWriteConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) {
valueObj = (Set) converter.get().apply(value);
}
@ -634,9 +644,11 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
facet = null;
}
Optional<Function<Object, Object>> converter = prop.getWriteConverter(sessionOps.getSessionRepository());
Optional<Function<Object, Object>> converter =
prop.getWriteConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) {
Map<Object, Object> convertedMap = (Map<Object, Object>) converter.get().apply(Immutables.mapOf(key, value));
Map<Object, Object> convertedMap =
(Map<Object, Object>) converter.get().apply(Immutables.mapOf(key, value));
for (Map.Entry<Object, Object> e : convertedMap.entrySet()) {
assignments.put(QueryBuilder.put(p.getColumnName(), e.getKey(), e.getValue()), facet);
}
@ -672,7 +684,8 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
facet = null;
}
Optional<Function<Object, Object>> converter = prop.getWriteConverter(sessionOps.getSessionRepository());
Optional<Function<Object, Object>> converter =
prop.getWriteConverter(sessionOps.getSessionRepository());
if (converter.isPresent()) {
Map convertedMap = (Map) converter.get().apply(map);
assignments.put(QueryBuilder.putAll(p.getColumnName(), convertedMap), facet);
@ -780,16 +793,39 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
}
}
@Override
protected boolean isIdempotentOperation() {
return assignments
.values()
.stream()
.allMatch(
facet -> {
if (facet != null) {
Set<HelenusProperty> props = facet.getProperties();
if (props != null && props.size() > 0) {
return props.stream().allMatch(prop -> prop.isIdempotent());
} else {
return true;
}
} else {
// In this case our UPDATE statement made mutations via the List, Set, Map methods only.
return false;
}
})
|| super.isIdempotentOperation();
}
@Override
public E sync() throws TimeoutException {
E result = super.sync();
if (entity.isCacheable()) {
if (result != null && entity.isCacheable()) {
if (draft != null) {
sessionOps.updateCache(draft, bindFacetValues());
adjustTtlAndWriteTime(draft);
adjustTtlAndWriteTime((MapExportable) result);
sessionOps.updateCache(result, bindFacetValues());
} else if (pojo != null) {
adjustTtlAndWriteTime((MapExportable) pojo);
sessionOps.updateCache(pojo, bindFacetValues());
adjustTtlAndWriteTime((MapExportable)pojo);
} else {
sessionOps.cacheEvict(bindFacetValues());
}
@ -803,16 +839,18 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
return sync();
}
E result = super.sync(uow);
if (draft != null) {
adjustTtlAndWriteTime(draft);
if (result != null) {
if (draft != null) {
adjustTtlAndWriteTime(draft);
}
if (entity != null && MapExportable.class.isAssignableFrom(entity.getMappingInterface())) {
adjustTtlAndWriteTime((MapExportable) result);
cacheUpdate(uow, result, bindFacetValues());
} else if (pojo != null) {
adjustTtlAndWriteTime((MapExportable) pojo);
cacheUpdate(uow, (E) pojo, bindFacetValues());
return (E) pojo;
}
cacheUpdate(uow, result, bindFacetValues());
} else if (pojo != null) {
cacheUpdate(uow, (E) pojo, bindFacetValues());
adjustTtlAndWriteTime((MapExportable)pojo);
return (E) pojo;
}
return result;
}
@ -828,7 +866,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
adjustTtlAndWriteTime(draft);
} else if (pojo != null) {
result = (E) pojo;
adjustTtlAndWriteTime((MapExportable)pojo);
adjustTtlAndWriteTime((MapExportable) pojo);
} else {
result = null;
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,4 +23,6 @@ public interface Drafted<T> extends MapExportable {
Set<String> mutated();
T build();
Set<String> read();
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,12 +19,55 @@ package net.helenus.core.reflect;
import net.helenus.core.Getter;
public interface Entity {
String WRITTEN_AT_METHOD = "writtenAt";
String TTL_OF_METHOD = "ttlOf";
String WRITTEN_AT_METHOD = "writtenAt";
String TTL_OF_METHOD = "ttlOf";
String TOKEN_OF_METHOD = "tokenOf";
default Long writtenAt(Getter getter) { return 0L; }
default Long writtenAt(String prop) { return 0L; };
/**
* The write time for the property in question referenced by the getter.
*
* @param getter the property getter
* @return the timestamp associated with the property identified by the getter
*/
default Long writtenAt(Getter getter) {
return 0L;
}
default Integer ttlOf(Getter getter) { return 0; };
default Integer ttlOf(String prop) {return 0; };
/**
* The write time for the property in question referenced by the property name.
*
* @param prop the name of a property in this entity
* @return the timestamp associated with the property identified by the property name if it exists
*/
default Long writtenAt(String prop) {
return 0L;
};
/**
* The time-to-live for the property in question referenced by the getter.
*
* @param getter the property getter
* @return the time-to-live in seconds associated with the property identified by the getter
*/
default Integer ttlOf(Getter getter) {
return 0;
};
/**
* The time-to-live for the property in question referenced by the property name.
*
* @param prop the name of a property in this entity
* @return the time-to-live in seconds associated with the property identified by the property name if it exists
*/
default Integer ttlOf(String prop) {
return 0;
};
/**
* The token (partition identifier) for this entity which can change over time if
* the cluster grows or shrinks but should be stable otherwise.
*
* @return the token for the entity
*/
default Long tokenOf() { return 0L; }
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -63,6 +64,11 @@ public final class HelenusNamedProperty implements HelenusProperty {
return false;
}
@Override
public boolean isIdempotent() {
return false;
}
@Override
public Class<?> getJavaType() {
throw new HelenusMappingException("will never called");

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,12 +17,25 @@
package net.helenus.core.reflect;
import java.util.Map;
import java.util.Set;
import net.helenus.core.Getter;
public interface MapExportable {
String TO_MAP_METHOD = "toMap";
String TO_READ_SET_METHOD = "toReadSet";
String PUT_METHOD = "put";
Map<String, Object> toMap();
default Map<String, Object> toMap(boolean mutable) { return null; }
default void put(String key, Object value) { }
default Map<String, Object> toMap(boolean mutable) {
return null;
}
default Set<String> toReadSet() {
return null;
}
default void put(String key, Object value) {}
default <T> void put(Getter<T> getter, T value) {}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -15,14 +16,9 @@
*/
package net.helenus.core.reflect;
import net.helenus.core.Getter;
import net.helenus.core.Helenus;
import net.helenus.core.cache.CacheUtil;
import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.annotation.Transient;
import net.helenus.mapping.value.ValueProviderMap;
import net.helenus.support.HelenusException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.InvalidObjectException;
import java.io.ObjectInputStream;
import java.io.ObjectStreamException;
@ -32,15 +28,20 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;
import net.helenus.core.Getter;
import net.helenus.core.Helenus;
import net.helenus.core.cache.CacheUtil;
import net.helenus.mapping.MappingUtil;
import net.helenus.mapping.annotation.Transient;
import net.helenus.mapping.value.ValueProviderMap;
import net.helenus.support.HelenusException;
public class MapperInvocationHandler<E> implements InvocationHandler, Serializable {
private static final long serialVersionUID = -7044209982830584984L;
private Map<String, Object> src;
private final Set<String> read = new HashSet<String>();
private final Class<E> iface;
public MapperInvocationHandler(Class<E> iface, Map<String, Object> src) {
@ -101,55 +102,95 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
}
}
if (otherObj instanceof MapExportable) {
return MappingUtil.compareMaps((MapExportable)otherObj, src);
return MappingUtil.compareMaps((MapExportable) otherObj, src);
}
return false;
}
if (MapExportable.PUT_METHOD.equals(methodName) && method.getParameterCount() == 2) {
final String key = (String)args[0];
final Object value = (Object)args[1];
if (src instanceof ValueProviderMap) {
this.src = fromValueProviderMap(src);
final String key;
if (args[0] instanceof String) {
key = (String) args[0];
} else if (args[0] instanceof Getter) {
key = MappingUtil.resolveMappingProperty((Getter) args[0]).getProperty().getPropertyName();
} else {
key = null;
}
if (key != null) {
final Object value = (Object) args[1];
if (src instanceof ValueProviderMap) {
this.src = fromValueProviderMap(src);
}
src.put(key, value);
}
src.put(key, value);
return null;
}
if (Entity.WRITTEN_AT_METHOD.equals(methodName) && method.getParameterCount() == 1) {
final String key;
if (args[0] instanceof String) {
key = CacheUtil.writeTimeKey((String)args[0]);
key = CacheUtil.writeTimeKey((String) args[0]);
} else if (args[0] instanceof Getter) {
Getter getter = (Getter)args[0];
key = CacheUtil.writeTimeKey(MappingUtil.resolveMappingProperty(getter).getProperty().getColumnName().toCql(false));
Getter getter = (Getter) args[0];
key =
CacheUtil.writeTimeKey(
MappingUtil.resolveMappingProperty(getter)
.getProperty()
.getColumnName()
.toCql(false));
} else {
return 0L;
}
Long v = (Long)src.get(key);
Long v = (Long) src.get(key);
if (v != null) {
return v;
}
return 0L;
}
if (Entity.TOKEN_OF_METHOD.equals(methodName) && method.getParameterCount() == 0) {
Long v = (Long) src.get("");
if (v != null) {
return v;
}
return 0L;
}
if (Entity.TTL_OF_METHOD.equals(methodName) && method.getParameterCount() == 1) {
final String key;
if (args[0] instanceof String) {
key = CacheUtil.ttlKey((String)args[0]);
key = CacheUtil.ttlKey((String) args[0]);
} else if (args[0] instanceof Getter) {
Getter getter = (Getter)args[0];
key = CacheUtil.ttlKey(MappingUtil.resolveMappingProperty(getter).getProperty().getColumnName().toCql(false));
Getter getter = (Getter) args[0];
key =
CacheUtil.ttlKey(
MappingUtil.resolveMappingProperty(getter)
.getProperty()
.getColumnName()
.toCql(false));
} else {
return 0;
}
int v[] = (int[])src.get(key);
int v[] = (int[]) src.get(key);
if (v != null) {
return v[0];
}
return 0;
}
if (MapExportable.TO_MAP_METHOD.equals(methodName)) {
if (method.getParameterCount() == 1 && args[0] instanceof Boolean) {
if ((boolean) args[0] == true) {
return fromValueProviderMap(src, true);
}
}
return Collections.unmodifiableMap(src);
}
if (MapExportable.TO_READ_SET_METHOD.equals(methodName)) {
return read;
}
if (method.getParameterCount() != 0 || method.getReturnType() == void.class) {
throw new HelenusException("invalid getter method " + method);
}
@ -174,14 +215,8 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
return Helenus.dsl(iface);
}
if (MapExportable.TO_MAP_METHOD.equals(methodName)) {
if (method.getParameterCount() == 1 && args[0] instanceof Boolean) {
if ((boolean)args[0] == true) { return src; }
}
return Collections.unmodifiableMap(src);
}
final Object value = src.get(methodName);
read.add(methodName);
if (value == null) {
@ -208,12 +243,32 @@ public class MapperInvocationHandler<E> implements InvocationHandler, Serializab
}
static Map<String, Object> fromValueProviderMap(Map v) {
Map<String, Object> m = new HashMap<String, Object>(v.size());
Set<String> keys = v.keySet();
for (String key : keys) {
m.put(key, v.get(key));
return fromValueProviderMap(v, false);
}
static Map<String, Object> fromValueProviderMap(Map v, boolean mutable) {
if (v instanceof ValueProviderMap) {
Map<String, Object> m = new HashMap<String, Object>(v.size());
Set<String> keys = v.keySet();
for (String key : keys) {
Object value = v.get(key);
if (value != null && mutable) {
if (ImmutableList.class.isAssignableFrom(value.getClass())) {
m.put(key, new ArrayList((List) value));
} else if (ImmutableMap.class.isAssignableFrom(value.getClass())) {
m.put(key, new HashMap((Map) value));
} else if (ImmutableSet.class.isAssignableFrom(value.getClass())) {
m.put(key, new HashSet((Set) value));
} else {
m.put(key, value);
}
} else {
m.put(key, value);
}
}
return m;
}
return m;
return v;
}
static class SerializationProxy<E> implements Serializable {

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -34,4 +35,6 @@ public interface HelenusEntity {
HelenusProperty getProperty(String name);
List<Facet> getFacets();
boolean isDraftable();
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,6 +1,7 @@
/*
*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@ -39,6 +40,7 @@ public final class HelenusMappingEntity implements HelenusEntity {
private final HelenusEntityType type;
private final IdentityName name;
private final boolean cacheable;
private final boolean draftable;
private final ImmutableMap<String, Method> methods;
private final ImmutableMap<String, HelenusProperty> props;
private final ImmutableList<HelenusProperty> orderedProps;
@ -112,12 +114,23 @@ public final class HelenusMappingEntity implements HelenusEntity {
// Caching
cacheable = (null != iface.getDeclaredAnnotation(Cacheable.class));
// Draft
Class<?> draft;
try {
draft = Class.forName(iface.getName() + "$Draft");
} catch (Exception ignored) {
draft = null;
}
draftable = (draft != null);
// Materialized view
List<HelenusProperty> primaryKeyProperties = new ArrayList<>();
ImmutableList.Builder<Facet> facetsBuilder = ImmutableList.builder();
if (iface.getDeclaredAnnotation(MaterializedView.class) == null) {
facetsBuilder.add(new Facet("table", name.toCql()).setFixed());
} else {
facetsBuilder.add(new Facet("table", Helenus.entity(iface.getInterfaces()[0]).getName().toCql())
facetsBuilder.add(
new Facet("table", Helenus.entity(iface.getInterfaces()[0]).getName().toCql())
.setFixed());
}
for (HelenusProperty prop : orderedProps) {
@ -131,8 +144,9 @@ public final class HelenusMappingEntity implements HelenusEntity {
facetsBuilder.add(new UnboundFacet(primaryKeyProperties));
primaryKeyProperties = null;
}
for (ConstraintValidator<?, ?> constraint : MappingUtil.getValidators(prop.getGetterMethod())) {
if (constraint.getClass().isAssignableFrom(DistinctValidator.class)) {
for (ConstraintValidator<?, ?> constraint :
MappingUtil.getValidators(prop.getGetterMethod())) {
if (constraint instanceof DistinctValidator) {
DistinctValidator validator = (DistinctValidator) constraint;
String[] values = validator.constraintAnnotation.value();
UnboundFacet facet;
@ -210,6 +224,11 @@ public final class HelenusMappingEntity implements HelenusEntity {
return cacheable;
}
@Override
public boolean isDraftable() {
return draftable;
}
@Override
public Class<?> getMappingInterface() {
return iface;

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -35,6 +36,7 @@ public final class HelenusMappingProperty implements HelenusProperty {
private final String propertyName;
private final Optional<IdentityName> indexName;
private final boolean caseSensitiveIndex;
private final boolean idempotent;
private final ColumnInformation columnInfo;
@ -56,6 +58,15 @@ public final class HelenusMappingProperty implements HelenusProperty {
this.columnInfo = new ColumnInformation(getter);
switch (this.columnInfo.getColumnType()) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
this.idempotent = true;
break;
default:
this.idempotent = MappingUtil.idempotent(getter);
}
this.genericJavaType = getter.getGenericReturnType();
this.javaType = getter.getReturnType();
this.abstractJavaType = MappingJavaTypes.resolveJavaType(this.javaType);
@ -112,6 +123,11 @@ public final class HelenusMappingProperty implements HelenusProperty {
return caseSensitiveIndex;
}
@Override
public boolean isIdempotent() {
return idempotent;
}
@Override
public String getPropertyName() {
return propertyName;

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -37,6 +38,8 @@ public interface HelenusProperty {
boolean caseSensitiveIndex();
boolean isIdempotent();
Class<?> getJavaType();
AbstractDataType getDataType();

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,7 +17,6 @@
package net.helenus.mapping;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
@ -124,14 +124,27 @@ public final class MappingUtil {
return false;
}
public static boolean idempotent(Method getterMethod) {
Column column = getterMethod.getDeclaredAnnotation(Column.class);
if (column != null) {
return column.idempotent();
}
return false;
}
public static String getPropertyName(Method getter) {
return getter.getName();
}
public static HelenusProperty getPropertyForColumn(HelenusEntity entity, String name) {
if (name == null)
return null;
return entity.getOrderedProperties().stream().filter(p -> p.getColumnName().equals(name)).findFirst().orElse(null);
if (name == null) return null;
return entity
.getOrderedProperties()
.stream()
.filter(p -> p.getColumnName().equals(name))
.findFirst()
.orElse(null);
}
public static String getDefaultColumnName(Method getter) {
@ -292,28 +305,6 @@ public final class MappingUtil {
}
}
// https://stackoverflow.com/a/4882306/366692
public static <T> T clone(T object) throws CloneNotSupportedException {
Object clone = null;
// Use reflection, because there is no other way
try {
Method method = object.getClass().getMethod("clone");
clone = method.invoke(object);
} catch (InvocationTargetException e) {
rethrow(e.getCause());
} catch (Exception cause) {
rethrow(cause);
}
if (object.getClass().isInstance(clone)) {
@SuppressWarnings("unchecked") // clone class <= object class <= T
T t = (T) clone;
return t;
} else {
throw new ClassCastException(clone.getClass().getName());
}
}
private static void rethrow(Throwable cause) throws CloneNotSupportedException {
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
@ -331,22 +322,24 @@ public final class MappingUtil {
public static boolean compareMaps(MapExportable me, Map<String, Object> m2) {
Map<String, Object> m1 = me.toMap();
List<String> matching = m2.entrySet()
List<String> matching =
m2.entrySet()
.stream()
.filter(e -> !e.getKey().matches("^_.*_(ttl|writeTime)$"))
.filter(e -> {
String k = e.getKey();
if (m1.containsKey(k)) {
Object o1 = e.getValue();
Object o2 = m1.get(k);
if (o1 == o2 || o1.equals(o2))
return true;
}
return false;
})
.filter(
e -> {
String k = e.getKey();
if (m1.containsKey(k)) {
Object o1 = e.getValue();
Object o2 = m1.get(k);
if (o1 == o2 || o1.equals(o2)) return true;
}
return false;
})
.map(e -> e.getKey())
.collect(Collectors.toList());
List<String> divergent = m1.entrySet()
List<String> divergent =
m1.entrySet()
.stream()
.filter(e -> !e.getKey().matches("^_.*_(ttl|writeTime)$"))
.filter(e -> !matching.contains(e.getKey()))
@ -354,5 +347,4 @@ public final class MappingUtil {
.collect(Collectors.toList());
return divergent.size() > 0 ? false : true;
}
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (C) 2015 The Helenus Authors
* Copyright (C) 2015 The Casser Authors
* Copyright (C) 2015-2018 The Helenus Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

Some files were not shown because too many files have changed in this diff Show more