diff --git a/src/main/java/net/helenus/core/operation/BatchOperation.java b/src/main/java/net/helenus/core/operation/BatchOperation.java index 00b990e..13685a4 100644 --- a/src/main/java/net/helenus/core/operation/BatchOperation.java +++ b/src/main/java/net/helenus/core/operation/BatchOperation.java @@ -21,6 +21,7 @@ import com.datastax.driver.core.ResultSet; import com.google.common.base.Stopwatch; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import net.helenus.core.AbstractSessionOperations; @@ -31,7 +32,6 @@ public class BatchOperation extends Operation { private BatchStatement batch = null; private List> operations = new ArrayList>(); private boolean logged = true; - private long timestamp = 0L; public BatchOperation(AbstractSessionOperations sessionOperations) { super(sessionOperations); @@ -47,11 +47,14 @@ public class BatchOperation extends Operation { batch.addAll( operations.stream().map(o -> o.buildStatement(cached)).collect(Collectors.toList())); batch.setConsistencyLevel(sessionOps.getDefaultConsistencyLevel()); - timestamp = System.nanoTime(); - batch.setDefaultTimestamp(timestamp); return batch; } + private long captureTimestampMsec() { + // Java 9: Instant.now().truncatedTo( ChronoUnit.MICROSECONDS ); + return TimeUnit.NANOSECONDS.convert(System.nanoTime(), TimeUnit.MICROSECONDS); + } + public BatchOperation logged() { logged = true; return this; @@ -66,8 +69,7 @@ public class BatchOperation extends Operation { if (operations.size() == 0) return 0L; final Timer.Context context = requestLatency.time(); try { - timestamp = System.nanoTime(); - batch.setDefaultTimestamp(timestamp); + batch.setDefaultTimestamp(captureTimestampMsec()); ResultSet resultSet = this.execute( sessionOps, @@ -83,7 +85,7 @@ public class BatchOperation extends Operation { } finally { context.stop(); } - return timestamp; + return batch.getDefaultTimestamp(); } public Long sync(UnitOfWork uow) throws TimeoutException { @@ -94,6 +96,7 @@ public class BatchOperation extends Operation { final Stopwatch timer = Stopwatch.createStarted(); try { uow.recordCacheAndDatabaseOperationCount(0, 1); + batch.setDefaultTimestamp(captureTimestampMsec()); ResultSet resultSet = this.execute( sessionOps, @@ -111,7 +114,7 @@ public class BatchOperation extends Operation { timer.stop(); } uow.addDatabaseTime("Cassandra", timer); - return timestamp; + return batch.getDefaultTimestamp(); } public void addAll(BatchOperation batch) { @@ -126,9 +129,13 @@ public class BatchOperation extends Operation { StringBuilder s = new StringBuilder(); s.append("BEGIN "); if (!logged) { - s.append("UN"); + s.append("UNLOGGED "); + } + s.append("BATCH "); + + if (batch.getDefaultTimestamp() > -9223372036854775808L) { + s.append("USING TIMESTAMP ").append(String.valueOf(batch.getDefaultTimestamp())).append(" "); } - s.append("LOGGED BATCH; "); s.append( operations .stream()