Refining logging.
This commit is contained in:
parent
1642f09ce9
commit
18f2a057de
7 changed files with 162 additions and 89 deletions
|
@ -18,7 +18,9 @@ package net.helenus.core;
|
|||
import java.io.PrintStream;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.google.common.base.Stopwatch;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -60,7 +62,7 @@ public abstract class AbstractSessionOperations {
|
|||
|
||||
public PreparedStatement prepare(RegularStatement statement) {
|
||||
try {
|
||||
log(statement, null, false);
|
||||
log(statement, null, null, false);
|
||||
return currentSession().prepare(statement);
|
||||
} catch (RuntimeException e) {
|
||||
throw translateException(e);
|
||||
|
@ -69,41 +71,61 @@ public abstract class AbstractSessionOperations {
|
|||
|
||||
public ListenableFuture<PreparedStatement> prepareAsync(RegularStatement statement) {
|
||||
try {
|
||||
log(statement, null, false);
|
||||
log(statement, null, null, false);
|
||||
return currentSession().prepareAsync(statement);
|
||||
} catch (RuntimeException e) {
|
||||
throw translateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public ResultSet execute(Statement statement, boolean showValues) {
|
||||
return execute(statement, null, showValues);
|
||||
public ResultSet execute(Statement statement, boolean showValues) {
|
||||
return execute(statement, null, null, showValues);
|
||||
}
|
||||
|
||||
public ResultSet execute(Statement statement, Stopwatch timer, boolean showValues) {
|
||||
return execute(statement, null, timer, showValues);
|
||||
}
|
||||
|
||||
public ResultSet execute(Statement statement, UnitOfWork uow, boolean showValues) {
|
||||
return execute(statement, uow, null, showValues);
|
||||
}
|
||||
|
||||
public ResultSet execute(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {
|
||||
return executeAsync(statement, uow, timer, showValues).getUninterruptibly();
|
||||
}
|
||||
|
||||
public ResultSet execute(Statement statement, UnitOfWork uow, boolean showValues) {
|
||||
return executeAsync(statement, uow, showValues).getUninterruptibly();
|
||||
}
|
||||
public ResultSetFuture executeAsync(Statement statement, boolean showValues) {
|
||||
return executeAsync(statement, null, null, showValues);
|
||||
}
|
||||
|
||||
public ResultSetFuture executeAsync(Statement statement, boolean showValues) {
|
||||
return executeAsync(statement, null, showValues);
|
||||
}
|
||||
public ResultSetFuture executeAsync(Statement statement, Stopwatch timer, boolean showValues) {
|
||||
return executeAsync(statement, null, timer, showValues);
|
||||
}
|
||||
|
||||
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, boolean showValues) {
|
||||
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, boolean showValues) {
|
||||
return executeAsync(statement, uow, null, showValues);
|
||||
}
|
||||
|
||||
public ResultSetFuture executeAsync(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {
|
||||
try {
|
||||
log(statement, uow, showValues);
|
||||
log(statement, uow, timer, showValues);
|
||||
return currentSession().executeAsync(statement);
|
||||
} catch (RuntimeException e) {
|
||||
throw translateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
void log(Statement statement, UnitOfWork uow, boolean showValues) {
|
||||
void log(Statement statement, UnitOfWork uow, Stopwatch timer, boolean showValues) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
String timerString = "";
|
||||
String uowString = "";
|
||||
if (uow != null) {
|
||||
uowString = "within UOW(" + uow.hashCode() + "): ";
|
||||
uowString = (timer != null) ? " " : "" + "UOW(" + uow.hashCode() + ")";
|
||||
}
|
||||
LOG.info(String.format("Execute statement %s%s", uowString, statement));
|
||||
if (timer != null) {
|
||||
timerString = String.format(" %s", timer.toString());
|
||||
}
|
||||
LOG.info(String.format("CQL%s%s - %s", uowString, timerString, statement));
|
||||
}
|
||||
if (isShowCql()) {
|
||||
if (statement instanceof BuiltStatement) {
|
||||
|
|
|
@ -46,8 +46,8 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
protected int cacheMisses = 0;
|
||||
protected int databaseLookups = 0;
|
||||
protected Stopwatch elapsedTime;
|
||||
protected Stopwatch databaseTime = Stopwatch.createUnstarted();
|
||||
protected Stopwatch cacheLookupTime = Stopwatch.createUnstarted();
|
||||
protected Map<String, Double> databaseTime = new HashMap<>();
|
||||
protected double cacheLookupTime = 0.0;
|
||||
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
|
||||
private boolean aborted = false;
|
||||
private boolean committed = false;
|
||||
|
@ -60,14 +60,19 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
}
|
||||
|
||||
@Override
|
||||
public Stopwatch getExecutionTimer() {
|
||||
return databaseTime;
|
||||
}
|
||||
public void addDatabaseTime(String name, Stopwatch amount) {
|
||||
Double time = databaseTime.get(name);
|
||||
if (time == null) {
|
||||
databaseTime.put(name, (double)amount.elapsed(TimeUnit.MICROSECONDS));
|
||||
} else {
|
||||
databaseTime.put(name, time + amount.elapsed(TimeUnit.MICROSECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stopwatch getCacheLookupTimer() {
|
||||
return cacheLookupTime;
|
||||
}
|
||||
@Override
|
||||
public void addCacheLookupTime(Stopwatch amount) {
|
||||
cacheLookupTime += amount.elapsed(TimeUnit.MICROSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addNestedUnitOfWork(UnitOfWork<E> uow) {
|
||||
|
@ -79,7 +84,7 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
@Override
|
||||
public UnitOfWork<E> begin() {
|
||||
elapsedTime = Stopwatch.createStarted();
|
||||
// log.record(txn::start)
|
||||
// log.recordCacheAndDatabaseOperationCount(txn::start)
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -90,7 +95,7 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
}
|
||||
|
||||
@Override
|
||||
public void record(int cache, int ops) {
|
||||
public void recordCacheAndDatabaseOperationCount(int cache, int ops) {
|
||||
if (cache > 0) {
|
||||
cacheHits += cache;
|
||||
} else {
|
||||
|
@ -103,8 +108,8 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
|
||||
public String logTimers(String what) {
|
||||
double e = (double) elapsedTime.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
|
||||
double d = (double) databaseTime.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
|
||||
double c = (double) cacheLookupTime.elapsed(TimeUnit.MICROSECONDS) / 1000.0;
|
||||
double d = databaseTime.containsKey("Cassandra") ? databaseTime.get("Cassandra") / 1000.0 : 0.0;
|
||||
double c = cacheLookupTime / 1000.0;
|
||||
double fd = (d / e) * 100.0;
|
||||
double fc = (c / e) * 100.0;
|
||||
double dat = d + c;
|
||||
|
@ -112,7 +117,7 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
String nested = this.nested.stream().map(uow -> String.valueOf(uow.hashCode()))
|
||||
.collect(Collectors.joining(", "));
|
||||
return String.format(Locale.US,
|
||||
"UOW(%s%s) %s (total: %,.3fms cache: %,.3fms %,2.2f%% (%,d hit, %,d miss) %,d database operation%s took %,.3fms %,2.2f%% [%,.3fms %,2.2f%%])%s",
|
||||
"UOW(%s%s) %s (total: %,.3fms cache: %,.3fms %,2.2f%% (%,d hit, %,d miss) %,d Cassandra operation%s took %,.3fms %,2.2f%% [%,.3fms %,2.2f%%])%s",
|
||||
hashCode(), (this.nested.size() > 0 ? ", [" + nested + "]" : ""), what, e, c, fc, cacheHits,
|
||||
cacheMisses, databaseLookups, (databaseLookups > 1) ? "s" : "", d, fd, dat, daf,
|
||||
(purpose == null ? "" : " in " + purpose));
|
||||
|
@ -191,7 +196,7 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
}
|
||||
}
|
||||
|
||||
// log.record(txn::provisionalCommit)
|
||||
// log.recordCacheAndDatabaseOperationCount(txn::provisionalCommit)
|
||||
// examine log for conflicts in read-set and write-set between begin and
|
||||
// provisional commit
|
||||
// if (conflict) { throw new ConflictingUnitOfWorkException(this) }
|
||||
|
@ -234,7 +239,7 @@ public abstract class AbstractUnitOfWork<E extends Exception> implements UnitOfW
|
|||
uow.committed = false;
|
||||
uow.aborted = true;
|
||||
});
|
||||
// log.record(txn::abort)
|
||||
// log.recordCacheAndDatabaseOperationCount(txn::abort)
|
||||
// cache.invalidateSince(txn::start time)
|
||||
if (!hasAborted()) {
|
||||
elapsedTime.stop();
|
||||
|
|
|
@ -25,7 +25,7 @@ import net.helenus.core.cache.Facet;
|
|||
public interface UnitOfWork<X extends Exception> extends AutoCloseable {
|
||||
|
||||
/**
|
||||
* Marks the beginning of a transactional section of work. Will write a record
|
||||
* Marks the beginning of a transactional section of work. Will write a recordCacheAndDatabaseOperationCount
|
||||
* to the shared write-ahead log.
|
||||
*
|
||||
* @return the handle used to commit or abort the work.
|
||||
|
@ -61,10 +61,10 @@ public interface UnitOfWork<X extends Exception> extends AutoCloseable {
|
|||
|
||||
UnitOfWork setPurpose(String purpose);
|
||||
|
||||
Stopwatch getExecutionTimer();
|
||||
void addDatabaseTime(String name, Stopwatch amount);
|
||||
void addCacheLookupTime(Stopwatch amount);
|
||||
|
||||
Stopwatch getCacheLookupTimer();
|
||||
|
||||
void record(int cache, int ops);
|
||||
// Cache > 0 means "cache hit", < 0 means cache miss.
|
||||
void recordCacheAndDatabaseOperationCount(int cache, int database);
|
||||
|
||||
}
|
||||
|
|
|
@ -71,7 +71,12 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
if (cacheResult != null) {
|
||||
result = Optional.of(cacheResult);
|
||||
updateCache = false;
|
||||
}
|
||||
sessionCacheHits.mark();
|
||||
cacheHits.mark();
|
||||
} else {
|
||||
sessionCacheMiss.mark();
|
||||
cacheMiss.mark();
|
||||
}
|
||||
}
|
||||
|
||||
if (!result.isPresent()) {
|
||||
|
@ -103,27 +108,41 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
|
|||
try {
|
||||
|
||||
Optional<E> result = Optional.empty();
|
||||
E cacheResult = null;
|
||||
E cachedResult = null;
|
||||
boolean updateCache = true;
|
||||
|
||||
if (enableCache) {
|
||||
Stopwatch timer = uow.getCacheLookupTimer();
|
||||
timer.start();
|
||||
List<Facet> facets = bindFacetValues();
|
||||
cacheResult = checkCache(uow, facets);
|
||||
if (cacheResult != null) {
|
||||
result = Optional.of(cacheResult);
|
||||
updateCache = false;
|
||||
} else {
|
||||
if (isSessionCacheable()) {
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
cacheResult = (E) sessionOps.checkCache(tableName, facets);
|
||||
if (cacheResult != null) {
|
||||
result = Optional.of(cacheResult);
|
||||
}
|
||||
}
|
||||
}
|
||||
timer.stop();
|
||||
Stopwatch timer = Stopwatch.createStarted();
|
||||
try {
|
||||
List<Facet> facets = bindFacetValues();
|
||||
cachedResult = checkCache(uow, facets);
|
||||
if (cachedResult != null) {
|
||||
result = Optional.of(cachedResult);
|
||||
updateCache = false;
|
||||
uowCacheHits.mark();
|
||||
cacheHits.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(1, 0);
|
||||
} else {
|
||||
uowCacheMiss.mark();
|
||||
if (isSessionCacheable()) {
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
cachedResult = (E) sessionOps.checkCache(tableName, facets);
|
||||
if (cachedResult != null) {
|
||||
result = Optional.of(cachedResult);
|
||||
sessionCacheHits.mark();
|
||||
cacheHits.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(1, 0);
|
||||
} else {
|
||||
sessionCacheMiss.mark();
|
||||
cacheMiss.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(-1, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
timer.stop();
|
||||
uow.addCacheLookupTime(timer);
|
||||
}
|
||||
}
|
||||
|
||||
if (!result.isPresent()) {
|
||||
|
|
|
@ -322,17 +322,10 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
|
|||
if (!facets.isEmpty()) {
|
||||
optionalCachedResult = uow.cacheLookup(facets);
|
||||
if (optionalCachedResult.isPresent()) {
|
||||
uowCacheHits.mark();
|
||||
uow.record(1, 0);
|
||||
result = (E) optionalCachedResult.get();
|
||||
}
|
||||
}
|
||||
|
||||
if (result == null) {
|
||||
uowCacheMiss.mark();
|
||||
uow.record(-1, 0);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -72,7 +72,12 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
if (cacheResult != null) {
|
||||
resultStream = Stream.of(cacheResult);
|
||||
updateCache = false;
|
||||
}
|
||||
sessionCacheHits.mark();
|
||||
cacheHits.mark();
|
||||
} else {
|
||||
sessionCacheMiss.mark();
|
||||
cacheMiss.mark();
|
||||
}
|
||||
}
|
||||
|
||||
if (resultStream == null) {
|
||||
|
@ -113,15 +118,37 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
|
|||
boolean updateCache = true;
|
||||
|
||||
if (enableCache) {
|
||||
Stopwatch timer = uow.getCacheLookupTimer();
|
||||
timer.start();
|
||||
List<Facet> facets = bindFacetValues();
|
||||
cachedResult = checkCache(uow, facets);
|
||||
if (cachedResult != null) {
|
||||
resultStream = Stream.of(cachedResult);
|
||||
updateCache = false;
|
||||
}
|
||||
timer.stop();
|
||||
Stopwatch timer = Stopwatch.createStarted();
|
||||
try {
|
||||
List<Facet> facets = bindFacetValues();
|
||||
cachedResult = checkCache(uow, facets);
|
||||
if (cachedResult != null) {
|
||||
resultStream = Stream.of(cachedResult);
|
||||
updateCache = false;
|
||||
uowCacheHits.mark();
|
||||
cacheHits.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(1, 0);
|
||||
} else {
|
||||
uowCacheMiss.mark();
|
||||
if (isSessionCacheable()) {
|
||||
String tableName = CacheUtil.schemaName(facets);
|
||||
cachedResult = (E) sessionOps.checkCache(tableName, facets);
|
||||
if (cachedResult != null) {
|
||||
resultStream = Stream.of(cachedResult);
|
||||
sessionCacheHits.mark();
|
||||
cacheHits.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(1, 0);
|
||||
} else {
|
||||
sessionCacheMiss.mark();
|
||||
cacheMiss.mark();
|
||||
uow.recordCacheAndDatabaseOperationCount(-1, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
timer.stop();
|
||||
uow.addCacheLookupTime(timer);
|
||||
}
|
||||
}
|
||||
|
||||
if (resultStream == null) {
|
||||
|
|
|
@ -36,15 +36,23 @@ import net.helenus.core.cache.Facet;
|
|||
public abstract class Operation<E> {
|
||||
|
||||
protected final AbstractSessionOperations sessionOps;
|
||||
protected final Meter uowCacheHits;
|
||||
protected final Meter uowCacheMiss;
|
||||
protected final Meter uowCacheHits;
|
||||
protected final Meter uowCacheMiss;
|
||||
protected final Meter sessionCacheHits;
|
||||
protected final Meter sessionCacheMiss;
|
||||
protected final Meter cacheHits;
|
||||
protected final Meter cacheMiss;
|
||||
protected final Timer requestLatency;
|
||||
|
||||
Operation(AbstractSessionOperations sessionOperations) {
|
||||
this.sessionOps = sessionOperations;
|
||||
MetricRegistry metrics = sessionOperations.getMetricRegistry();
|
||||
this.uowCacheHits = metrics.meter("net.helenus.UOW-cache-hits");
|
||||
this.uowCacheMiss = metrics.meter("net.helenus.UOW-cache-miss");
|
||||
this.uowCacheHits = metrics.meter("net.helenus.UOW-cache-hits");
|
||||
this.uowCacheMiss = metrics.meter("net.helenus.UOW-cache-miss");
|
||||
this.sessionCacheHits = metrics.meter("net.helenus.session-cache-hits");
|
||||
this.sessionCacheMiss = metrics.meter("net.helenus.session-cache-miss");
|
||||
this.cacheHits = metrics.meter("net.helenus.cache-hits");
|
||||
this.cacheMiss = metrics.meter("net.helenus.cache-miss");
|
||||
this.requestLatency = metrics.timer("net.helenus.request-latency");
|
||||
}
|
||||
|
||||
|
@ -67,20 +75,19 @@ public abstract class Operation<E> {
|
|||
}
|
||||
|
||||
Statement statement = options(buildStatement(cached));
|
||||
Stopwatch timer = null;
|
||||
if (uow != null) {
|
||||
timer = uow.getExecutionTimer();
|
||||
timer.start();
|
||||
}
|
||||
ResultSetFuture futureResultSet = session.executeAsync(statement, uow, showValues);
|
||||
if (uow != null)
|
||||
uow.record(0, 1);
|
||||
ResultSet resultSet = futureResultSet.getUninterruptibly(); // TODO(gburd): (timeout, units);
|
||||
Stopwatch timer = Stopwatch.createStarted();
|
||||
try {
|
||||
ResultSetFuture futureResultSet = session.executeAsync(statement, uow, timer, showValues);
|
||||
if (uow != null)
|
||||
uow.recordCacheAndDatabaseOperationCount(0, 1);
|
||||
ResultSet resultSet = futureResultSet.getUninterruptibly(); // TODO(gburd): (timeout, units);
|
||||
return resultSet;
|
||||
|
||||
if (uow != null)
|
||||
timer.stop();
|
||||
|
||||
return resultSet;
|
||||
} finally {
|
||||
timer.stop();
|
||||
if (uow != null)
|
||||
uow.addDatabaseTime("Cassandra", timer);
|
||||
}
|
||||
|
||||
} finally {
|
||||
|
||||
|
|
Loading…
Reference in a new issue