From d9a0e52df346df6499459d2356d577474338d863 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 2 Oct 2018 10:37:29 -0700 Subject: [PATCH] HBASE-20306 LoadTestTool does not print summary at end of run (Colin Garcia) Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java Amending-Author: Andrew Purtell --- .../hbase/util/MultiThreadedAction.java | 132 +++++++++++------- 1 file changed, 79 insertions(+), 53 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java index 75966fb464..f1a1a70279 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java @@ -19,6 +19,10 @@ package org.apache.hadoop.hbase.util; import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT; import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.stats.UniformSample; +import com.google.common.base.Preconditions; + import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -48,8 +52,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.Mut import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; import org.apache.hadoop.util.StringUtils; -import com.google.common.base.Preconditions; - /** * Common base class for reader and writer parts of multi-thread HBase load * test ({@link LoadTestTool}). @@ -74,6 +76,7 @@ public abstract class MultiThreadedAction { protected AtomicLong numCols = new AtomicLong(); protected AtomicLong totalOpTimeMs = new AtomicLong(); protected boolean verbose = false; + protected volatile boolean isProgressReporterRunning = false; protected LoadTestDataGenerator dataGenerator = null; @@ -179,6 +182,10 @@ public abstract class MultiThreadedAction { private class ProgressReporter implements Runnable { private String reporterId = ""; + private Histogram keysPerSecondHistogram = + YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));; + private Histogram latencyHistogram = + YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));; public ProgressReporter(String id) { this.reporterId = id; @@ -186,58 +193,76 @@ public abstract class MultiThreadedAction { @Override public void run() { - long startTime = System.currentTimeMillis(); - long priorNumKeys = 0; - long priorCumulativeOpTime = 0; - int priorAverageKeysPerSecond = 0; - - // Give other threads time to start. - Threads.sleep(REPORTING_INTERVAL_MS); - - while (numThreadsWorking.get() != 0) { - String threadsLeft = - "[" + reporterId + ":" + numThreadsWorking.get() + "] "; - if (numKeys.get() == 0) { - LOG.info(threadsLeft + "Number of keys = 0"); - } else { - long numKeys = MultiThreadedAction.this.numKeys.get(); - long time = System.currentTimeMillis() - startTime; - long totalOpTime = totalOpTimeMs.get(); - - long numKeysDelta = numKeys - priorNumKeys; - long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime; - - double averageKeysPerSecond = - (time > 0) ? (numKeys * 1000 / time) : 0; - - LOG.info(threadsLeft - + "Keys=" - + numKeys - + ", cols=" - + StringUtils.humanReadableInt(numCols.get()) - + ", time=" - + formatTime(time) - + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= " - + numKeys * 1000 / time + ", latency=" - + String.format("%.2f", (double)totalOpTime / (double)numKeys) - + " ms]") : "") - + ((numKeysDelta > 0) ? (" Current: [" + "keys/s=" - + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency=" - + String.format("%.2f", (double)totalOpTimeDelta / (double)numKeysDelta) - + " ms]") : "") - + progressInfo()); - - if (streamingCounters) { - printStreamingCounters(numKeysDelta, - averageKeysPerSecond - priorAverageKeysPerSecond); - } - - priorNumKeys = numKeys; - priorCumulativeOpTime = totalOpTime; - priorAverageKeysPerSecond = (int) averageKeysPerSecond; - } + isProgressReporterRunning = true; + try { + long startTime = System.currentTimeMillis(); + long priorNumKeys = 0; + long priorCumulativeOpTime = 0; + int priorAverageKeysPerSecond = 0; + // Give other threads time to start. Threads.sleep(REPORTING_INTERVAL_MS); + + while (numThreadsWorking.get() != 0) { + String threadsLeft = + "[" + reporterId + ":" + numThreadsWorking.get() + "] "; + if (numKeys.get() == 0) { + LOG.info(threadsLeft + "Number of keys = 0"); + } else { + long numKeys = MultiThreadedAction.this.numKeys.get(); + long time = System.currentTimeMillis() - startTime; + long totalOpTime = totalOpTimeMs.get(); + + long numKeysDelta = numKeys - priorNumKeys; + long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime; + + double averageKeysPerSecond = + (time > 0) ? (numKeys * 1000 / time) : 0; + long currentKeysPerSecond = numKeysDelta * 1000 / REPORTING_INTERVAL_MS; + double currentLatency = + (numKeysDelta > 0) ? ((double) totalOpTimeDelta / (double) (numKeysDelta)) : 0; + + LOG.info(threadsLeft + + "Keys=" + + numKeys + + ", cols=" + + StringUtils.humanReadableInt(numCols.get()) + + ", time=" + + formatTime(time) + + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= " + + numKeys * 1000 / time + ", latency=" + + String.format("%.2f", (double)totalOpTime / (double)numKeys) + + " ms]") : "") + + ((numKeysDelta > 0) ? (" Current: [" + "keys/s=" + + currentKeysPerSecond + ", latency=" + + String.format("%.2f", currentLatency) + + " ms]") : "") + + progressInfo()); + + keysPerSecondHistogram.update(currentKeysPerSecond); + latencyHistogram.update((long) currentLatency); + + if (streamingCounters) { + printStreamingCounters(numKeysDelta, + averageKeysPerSecond - priorAverageKeysPerSecond); + } + priorNumKeys = numKeys; + priorCumulativeOpTime = totalOpTime; + priorAverageKeysPerSecond = (int) averageKeysPerSecond; + + } + Threads.sleep(REPORTING_INTERVAL_MS); + } + LOG.info("RUN SUMMARY \n" + + "KEYS PER SECOND: \n" + + YammerHistogramUtils.getHistogramReport(keysPerSecondHistogram) + + " \n LATENCY: \n" + + YammerHistogramUtils.getHistogramReport(latencyHistogram) + ); + isProgressReporterRunning = false; + } catch (Exception ex) { + isProgressReporterRunning = false; + LOG.error("ProgressReporterThread exited early: " + ex); } } @@ -252,6 +277,7 @@ public abstract class MultiThreadedAction { System.err.println("reporter:counter:avgKeysPerSecond," + reporterId + "," + (long) (avgKeysPerSecondDelta)); } + } public void close() { @@ -265,7 +291,7 @@ public abstract class MultiThreadedAction { } public void waitForFinish() { - while (numThreadsWorking.get() != 0) { + while (isProgressReporterRunning) { Threads.sleepWithoutInterrupt(1000); } close(); -- 2.19.0