diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java index 410dd0c81d..dec4729a7e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.util; import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.UniformReservoir; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import java.io.IOException; @@ -70,6 +72,7 @@ public abstract class MultiThreadedAction { protected AtomicLong numCols = new AtomicLong(); protected AtomicLong totalOpTimeMs = new AtomicLong(); protected boolean verbose = false; + protected volatile boolean isProgressReporterRunning = false; protected LoadTestDataGenerator dataGenerator = null; @@ -175,6 +178,10 @@ public abstract class MultiThreadedAction { private class ProgressReporter implements Runnable { private String reporterId = ""; + private Histogram keysPerSecondHistogram = + YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));; + private Histogram latencyHistogram = + YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));; public ProgressReporter(String id) { this.reporterId = id; @@ -182,58 +189,76 @@ public abstract class MultiThreadedAction { @Override public void run() { - long startTime = System.currentTimeMillis(); - long priorNumKeys = 0; - long priorCumulativeOpTime = 0; - int priorAverageKeysPerSecond = 0; - - // Give other threads time to start. - Threads.sleep(REPORTING_INTERVAL_MS); - - while (numThreadsWorking.get() != 0) { - String threadsLeft = - "[" + reporterId + ":" + numThreadsWorking.get() + "] "; - if (numKeys.get() == 0) { - LOG.info(threadsLeft + "Number of keys = 0"); - } else { - long numKeys = MultiThreadedAction.this.numKeys.get(); - long time = System.currentTimeMillis() - startTime; - long totalOpTime = totalOpTimeMs.get(); - - long numKeysDelta = numKeys - priorNumKeys; - long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime; - - double averageKeysPerSecond = - (time > 0) ? (numKeys * 1000 / time) : 0; - - LOG.info(threadsLeft - + "Keys=" - + numKeys - + ", cols=" - + StringUtils.humanReadableInt(numCols.get()) - + ", time=" - + formatTime(time) - + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= " - + numKeys * 1000 / time + ", latency=" - + String.format("%.2f", (double)totalOpTime / (double)numKeys) - + " ms]") : "") - + ((numKeysDelta > 0) ? (" Current: [" + "keys/s=" - + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency=" - + String.format("%.2f", (double)totalOpTimeDelta / (double)numKeysDelta) - + " ms]") : "") - + progressInfo()); - - if (streamingCounters) { - printStreamingCounters(numKeysDelta, - averageKeysPerSecond - priorAverageKeysPerSecond); - } - - priorNumKeys = numKeys; - priorCumulativeOpTime = totalOpTime; - priorAverageKeysPerSecond = (int) averageKeysPerSecond; - } + isProgressReporterRunning = true; + try { + long startTime = System.currentTimeMillis(); + long priorNumKeys = 0; + long priorCumulativeOpTime = 0; + int priorAverageKeysPerSecond = 0; + // Give other threads time to start. Threads.sleep(REPORTING_INTERVAL_MS); + + while (numThreadsWorking.get() != 0) { + String threadsLeft = + "[" + reporterId + ":" + numThreadsWorking.get() + "] "; + if (numKeys.get() == 0) { + LOG.info(threadsLeft + "Number of keys = 0"); + } else { + long numKeys = MultiThreadedAction.this.numKeys.get(); + long time = System.currentTimeMillis() - startTime; + long totalOpTime = totalOpTimeMs.get(); + + long numKeysDelta = numKeys - priorNumKeys; + long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime; + + double averageKeysPerSecond = + (time > 0) ? (numKeys * 1000 / time) : 0; + long currentKeysPerSecond = numKeysDelta * 1000 / REPORTING_INTERVAL_MS; + double currentLatency = + (numKeysDelta > 0) ? ((double) totalOpTimeDelta / (double) (numKeysDelta)) : 0; + + LOG.info(threadsLeft + + "Keys=" + + numKeys + + ", cols=" + + StringUtils.humanReadableInt(numCols.get()) + + ", time=" + + formatTime(time) + + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= " + + numKeys * 1000 / time + ", latency=" + + String.format("%.2f", (double)totalOpTime / (double)numKeys) + + " ms]") : "") + + ((numKeysDelta > 0) ? (" Current: [" + "keys/s=" + + currentKeysPerSecond + ", latency=" + + String.format("%.2f", currentLatency) + + " ms]") : "") + + progressInfo()); + + keysPerSecondHistogram.update(currentKeysPerSecond); + latencyHistogram.update((long) currentLatency); + + if (streamingCounters) { + printStreamingCounters(numKeysDelta, + averageKeysPerSecond - priorAverageKeysPerSecond); + } + priorNumKeys = numKeys; + priorCumulativeOpTime = totalOpTime; + priorAverageKeysPerSecond = (int) averageKeysPerSecond; + + } + Threads.sleep(REPORTING_INTERVAL_MS); + } + LOG.info("RUN SUMMARY" + + "\nKEYS PER SECOND [" + reporterId + "]: " + + YammerHistogramUtils.getHistogramReport(keysPerSecondHistogram) + + "\nLATENCY [" + reporterId + "]: " + + YammerHistogramUtils.getHistogramReport(latencyHistogram) + ); + isProgressReporterRunning = false; + } catch (Exception ex) { + isProgressReporterRunning = false; + LOG.error("ProgressReporterThread exited early: " + ex); } } @@ -248,6 +273,7 @@ public abstract class MultiThreadedAction { System.err.println("reporter:counter:avgKeysPerSecond," + reporterId + "," + (long) (avgKeysPerSecondDelta)); } + } public void close() { @@ -261,7 +287,7 @@ public abstract class MultiThreadedAction { } public void waitForFinish() { - while (numThreadsWorking.get() != 0) { + while (isProgressReporterRunning) { Threads.sleepWithoutInterrupt(1000); } close();