Move statement logging into Operation, cover special case for batches. Cleanup UOW commit logging a bit.

This commit is contained in:
Greg Burd 2017-11-09 15:03:30 -05:00
parent 2f0801d36f
commit 6ff188f241
7 changed files with 52 additions and 69 deletions

View file

@ -59,7 +59,6 @@ public abstract class AbstractSessionOperations {
public PreparedStatement prepare(RegularStatement statement) {
try {
logStatement(statement, false);
return currentSession().prepare(statement);
} catch (RuntimeException e) {
throw translateException(e);
@ -68,59 +67,48 @@ public abstract class AbstractSessionOperations {
public ListenableFuture<PreparedStatement> prepareAsync(RegularStatement statement) {
try {
logStatement(statement, false);
return currentSession().prepareAsync(statement);
} catch (RuntimeException e) {
throw translateException(e);
}
}
public ResultSet execute(Statement statement, boolean showValues) {
return execute(statement, null, null, showValues);
public ResultSet execute(Statement statement) {
return execute(statement, null, null);
}
public ResultSet execute(Statement statement, Stopwatch timer, boolean showValues) {
return execute(statement, null, timer, showValues);
public ResultSet execute(Statement statement, Stopwatch timer) {
return execute(statement, null, timer);
}
public ResultSet execute(Statement statement, UnitOfWork uow, boolean showValues) {
return execute(statement, uow, null, showValues);
public ResultSet execute(Statement statement, UnitOfWork uow) {
return execute(statement, uow, null);
}
public ResultSet execute(
Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {
return executeAsync(statement, uow, timer, showValues).getUninterruptibly();
public ResultSet execute(Statement statement, UnitOfWork uow, Stopwatch timer) {
return executeAsync(statement, uow, timer).getUninterruptibly();
}
public ResultSetFuture executeAsync(Statement statement, boolean showValues) {
return executeAsync(statement, null, null, showValues);
public ResultSetFuture executeAsync(Statement statement) {
return executeAsync(statement, null, null);
}
public ResultSetFuture executeAsync(Statement statement, Stopwatch timer, boolean showValues) {
return executeAsync(statement, null, timer, showValues);
public ResultSetFuture executeAsync(Statement statement, Stopwatch timer) {
return executeAsync(statement, null, timer);
}
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, boolean showValues) {
return executeAsync(statement, uow, null, showValues);
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow) {
return executeAsync(statement, uow, null);
}
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, Stopwatch timer) {
try {
logStatement(statement, showValues);
return currentSession().executeAsync(statement);
} catch (RuntimeException e) {
throw translateException(e);
}
}
private void logStatement(Statement statement, boolean showValues) {
if (isShowCql()) {
printCql(Operation.queryString(statement, showValues));
} else if (LOG.isDebugEnabled()) {
LOG.info("CQL> " + Operation.queryString(statement, showValues));
}
}
public Tracer getZipkinTracer() {
return null;
}
@ -144,9 +132,5 @@ public abstract class AbstractSessionOperations {
public void updateCache(Object pojo, List<Facet> facets) {}
void printCql(String cql) {
getPrintStream().println(cql);
}
public void cacheEvict(List<Facet> facets) {}
}

View file

@ -796,11 +796,11 @@ public class HelenusSession extends AbstractSessionOperations implements Closeab
switch (entity.getType()) {
case TABLE:
execute(SchemaUtil.dropTable(entity), true);
execute(SchemaUtil.dropTable(entity));
break;
case UDT:
execute(SchemaUtil.dropUserType(entity), true);
execute(SchemaUtil.dropUserType(entity));
break;
default:

View file

@ -35,12 +35,12 @@ public final class TableOperations {
}
public void createTable(HelenusEntity entity) {
sessionOps.execute(SchemaUtil.createTable(entity), true);
sessionOps.execute(SchemaUtil.createTable(entity));
executeBatch(SchemaUtil.createIndexes(entity));
}
public void dropTable(HelenusEntity entity) {
sessionOps.execute(SchemaUtil.dropTable(entity), true);
sessionOps.execute(SchemaUtil.dropTable(entity));
}
public void validateTable(TableMetadata tmd, HelenusEntity entity) {
@ -77,19 +77,14 @@ public final class TableOperations {
}
public void createView(HelenusEntity entity) {
sessionOps.execute(
SchemaUtil.createMaterializedView(
sessionOps.usingKeyspace(), entity.getName().toCql(), entity),
true);
// executeBatch(SchemaUtil.createIndexes(entity)); NOTE: Unfortunately C* 3.10
// does not yet support 2i on materialized views.
sessionOps.execute(SchemaUtil.createMaterializedView(
sessionOps.usingKeyspace(), entity.getName().toCql(), entity));
// executeBatch(SchemaUtil.createIndexes(entity)); NOTE: Unfortunately C* 3.10 does not yet support 2i on materialized views.
}
public void dropView(HelenusEntity entity) {
sessionOps.execute(
SchemaUtil.dropMaterializedView(
sessionOps.usingKeyspace(), entity.getName().toCql(), entity),
true);
SchemaUtil.dropMaterializedView(sessionOps.usingKeyspace(), entity.getName().toCql(), entity));
}
public void updateView(TableMetadata tmd, HelenusEntity entity) {
@ -104,9 +99,6 @@ public final class TableOperations {
private void executeBatch(List<SchemaStatement> list) {
list.forEach(
s -> {
sessionOps.execute(s, true);
});
list.forEach(s -> sessionOps.execute(s));
}
}

View file

@ -33,12 +33,12 @@ public final class UserTypeOperations {
public void createUserType(HelenusEntity entity) {
sessionOps.execute(SchemaUtil.createUserType(entity), true);
sessionOps.execute(SchemaUtil.createUserType(entity));
}
public void dropUserType(HelenusEntity entity) {
sessionOps.execute(SchemaUtil.dropUserType(entity), true);
sessionOps.execute(SchemaUtil.dropUserType(entity));
}
public void validateUserType(UserType userType, HelenusEntity entity) {
@ -71,9 +71,6 @@ public final class UserTypeOperations {
private void executeBatch(List<SchemaStatement> list) {
list.forEach(
s -> {
sessionOps.execute(s, true);
});
list.forEach(s -> sessionOps.execute(s));
}
}

View file

@ -22,8 +22,7 @@ import java.util.Map;
import net.helenus.core.*;
import net.helenus.mapping.HelenusProperty;
public abstract class AbstractFilterStreamOperation<
E, O extends AbstractFilterStreamOperation<E, O>>
public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterStreamOperation<E, O>>
extends AbstractStreamOperation<E, O> {
protected Map<HelenusProperty, Filter<?>> filters = null;

View file

@ -104,11 +104,15 @@ public class BatchOperation extends Operation<Long> {
}
public String toString() {
return toString(true); //TODO(gburd): sessionOps.showQueryValues()
}
public String toString(boolean showValues) {
StringBuilder s = new StringBuilder();
s.append("BEGIN ");
if (!logged) { s.append("UN"); }
s.append("LOGGED BATCH; ");
s.append(operations.stream().map(o -> Operation.queryString(o.buildStatement(false), showValues)).collect(Collectors.joining(" ")));
s.append(operations.stream().map(o -> Operation.queryString(o.buildStatement(showValues), showValues)).collect(Collectors.joining(" ")));
s.append(" APPLY BATCH;");
return s.toString();
}

View file

@ -69,6 +69,10 @@ public abstract class Operation<E> {
this.requestLatency = metrics.timer("net.helenus.request-latency");
}
public static String queryString(BatchOperation operation, boolean includeValues) {
return operation.toString(includeValues);
}
public static String queryString(Statement statement, boolean includeValues) {
String query = null;
if (statement instanceof BuiltStatement) {
@ -88,15 +92,8 @@ public abstract class Operation<E> {
return query;
}
public ResultSet execute(
AbstractSessionOperations session,
UnitOfWork uow,
TraceContext traceContext,
long timeout,
TimeUnit units,
boolean showValues,
boolean cached)
throws TimeoutException {
public ResultSet execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext,
long timeout, TimeUnit units, boolean showValues, boolean cached) throws TimeoutException {
// Start recording in a Zipkin sub-span our execution time to perform this operation.
Tracer tracer = session.getZipkinTracer();
@ -113,9 +110,18 @@ public abstract class Operation<E> {
}
Statement statement = options(buildStatement(cached));
if (session.isShowCql() ) {
String stmt = (this instanceof BatchOperation) ? queryString((BatchOperation)this, showValues) : queryString(statement, showValues);
session.getPrintStream().println(stmt);
} else if (LOG.isDebugEnabled()) {
String stmt = (this instanceof BatchOperation) ? queryString((BatchOperation)this, showValues) : queryString(statement, showValues);
LOG.info("CQL> " + stmt);
}
Stopwatch timer = Stopwatch.createStarted();
try {
ResultSetFuture futureResultSet = session.executeAsync(statement, uow, timer, showValues);
ResultSetFuture futureResultSet = session.executeAsync(statement, uow, timer);
if (uow != null) uow.recordCacheAndDatabaseOperationCount(0, 1);
ResultSet resultSet = futureResultSet.getUninterruptibly(timeout, units);
ColumnDefinitions columnDefinitions = resultSet.getColumnDefinitions();
@ -129,11 +135,12 @@ public abstract class Operation<E> {
.map(InetAddress::toString)
.collect(Collectors.joining(", "));
ConsistencyLevel cl = ei.getAchievedConsistencyLevel();
if (cl == null) { cl = statement.getConsistencyLevel(); }
int se = ei.getSpeculativeExecutions();
String warn = ei.getWarnings().stream().collect(Collectors.joining(", "));
String ri =
String.format(
"%s %s %s %s %s %s%sspec-retries: %d",
"%s %s ~%s %s %s%s%sspec-retries: %d",
"server v" + qh.getCassandraVersion(),
qh.getAddress().toString(),
(oh != null && !oh.equals("")) ? " [tried: " + oh + "]" : "",
@ -141,7 +148,7 @@ public abstract class Operation<E> {
qh.getRack(),
(cl != null)
? (" consistency: "
+ cl.name()
+ cl.name() + " "
+ (cl.isDCLocal() ? " DC " : "")
+ (cl.isSerial() ? " SC " : ""))
: "",