Fix logged batch syntax and timestamp logic.

This commit is contained in:
Greg Burd 2018-01-24 14:33:37 -05:00
parent 27dd9a4eff
commit e2f45f82c9

View file

@ -21,6 +21,7 @@ import com.datastax.driver.core.ResultSet;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import net.helenus.core.AbstractSessionOperations; import net.helenus.core.AbstractSessionOperations;
@ -31,7 +32,6 @@ public class BatchOperation extends Operation<Long> {
private BatchStatement batch = null; private BatchStatement batch = null;
private List<AbstractOperation<?, ?>> operations = new ArrayList<AbstractOperation<?, ?>>(); private List<AbstractOperation<?, ?>> operations = new ArrayList<AbstractOperation<?, ?>>();
private boolean logged = true; private boolean logged = true;
private long timestamp = 0L;
public BatchOperation(AbstractSessionOperations sessionOperations) { public BatchOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations); super(sessionOperations);
@ -47,11 +47,14 @@ public class BatchOperation extends Operation<Long> {
batch.addAll( batch.addAll(
operations.stream().map(o -> o.buildStatement(cached)).collect(Collectors.toList())); operations.stream().map(o -> o.buildStatement(cached)).collect(Collectors.toList()));
batch.setConsistencyLevel(sessionOps.getDefaultConsistencyLevel()); batch.setConsistencyLevel(sessionOps.getDefaultConsistencyLevel());
timestamp = System.nanoTime();
batch.setDefaultTimestamp(timestamp);
return batch; return batch;
} }
private long captureTimestampMsec() {
// Java 9: Instant.now().truncatedTo( ChronoUnit.MICROSECONDS );
return TimeUnit.NANOSECONDS.convert(System.nanoTime(), TimeUnit.MICROSECONDS);
}
public BatchOperation logged() { public BatchOperation logged() {
logged = true; logged = true;
return this; return this;
@ -66,8 +69,7 @@ public class BatchOperation extends Operation<Long> {
if (operations.size() == 0) return 0L; if (operations.size() == 0) return 0L;
final Timer.Context context = requestLatency.time(); final Timer.Context context = requestLatency.time();
try { try {
timestamp = System.nanoTime(); batch.setDefaultTimestamp(captureTimestampMsec());
batch.setDefaultTimestamp(timestamp);
ResultSet resultSet = ResultSet resultSet =
this.execute( this.execute(
sessionOps, sessionOps,
@ -83,7 +85,7 @@ public class BatchOperation extends Operation<Long> {
} finally { } finally {
context.stop(); context.stop();
} }
return timestamp; return batch.getDefaultTimestamp();
} }
public Long sync(UnitOfWork<?> uow) throws TimeoutException { public Long sync(UnitOfWork<?> uow) throws TimeoutException {
@ -94,6 +96,7 @@ public class BatchOperation extends Operation<Long> {
final Stopwatch timer = Stopwatch.createStarted(); final Stopwatch timer = Stopwatch.createStarted();
try { try {
uow.recordCacheAndDatabaseOperationCount(0, 1); uow.recordCacheAndDatabaseOperationCount(0, 1);
batch.setDefaultTimestamp(captureTimestampMsec());
ResultSet resultSet = ResultSet resultSet =
this.execute( this.execute(
sessionOps, sessionOps,
@ -111,7 +114,7 @@ public class BatchOperation extends Operation<Long> {
timer.stop(); timer.stop();
} }
uow.addDatabaseTime("Cassandra", timer); uow.addDatabaseTime("Cassandra", timer);
return timestamp; return batch.getDefaultTimestamp();
} }
public void addAll(BatchOperation batch) { public void addAll(BatchOperation batch) {
@ -126,9 +129,13 @@ public class BatchOperation extends Operation<Long> {
StringBuilder s = new StringBuilder(); StringBuilder s = new StringBuilder();
s.append("BEGIN "); s.append("BEGIN ");
if (!logged) { if (!logged) {
s.append("UN"); s.append("UNLOGGED ");
}
s.append("BATCH ");
if (batch.getDefaultTimestamp() > -9223372036854775808L) {
s.append("USING TIMESTAMP ").append(String.valueOf(batch.getDefaultTimestamp())).append(" ");
} }
s.append("LOGGED BATCH; ");
s.append( s.append(
operations operations
.stream() .stream()