diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java index 410dd0c81d..c89942bb17 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java @@ -70,6 +70,7 @@ public abstract class MultiThreadedAction { protected AtomicLong numCols = new AtomicLong(); protected AtomicLong totalOpTimeMs = new AtomicLong(); protected boolean verbose = false; + protected volatile boolean isProgressReporterRunning = false; protected LoadTestDataGenerator dataGenerator = null; @@ -182,58 +183,57 @@ public abstract class MultiThreadedAction { @Override public void run() { - long startTime = System.currentTimeMillis(); - long priorNumKeys = 0; - long priorCumulativeOpTime = 0; - int priorAverageKeysPerSecond = 0; - - // Give other threads time to start. - Threads.sleep(REPORTING_INTERVAL_MS); - - while (numThreadsWorking.get() != 0) { - String threadsLeft = - "[" + reporterId + ":" + numThreadsWorking.get() + "] "; - if (numKeys.get() == 0) { - LOG.info(threadsLeft + "Number of keys = 0"); - } else { - long numKeys = MultiThreadedAction.this.numKeys.get(); - long time = System.currentTimeMillis() - startTime; - long totalOpTime = totalOpTimeMs.get(); - - long numKeysDelta = numKeys - priorNumKeys; - long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime; - - double averageKeysPerSecond = - (time > 0) ? (numKeys * 1000 / time) : 0; - - LOG.info(threadsLeft - + "Keys=" - + numKeys - + ", cols=" - + StringUtils.humanReadableInt(numCols.get()) - + ", time=" - + formatTime(time) - + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= " - + numKeys * 1000 / time + ", latency=" - + String.format("%.2f", (double)totalOpTime / (double)numKeys) - + " ms]") : "") - + ((numKeysDelta > 0) ? (" Current: [" + "keys/s=" - + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency=" - + String.format("%.2f", (double)totalOpTimeDelta / (double)numKeysDelta) - + " ms]") : "") - + progressInfo()); - - if (streamingCounters) { - printStreamingCounters(numKeysDelta, - averageKeysPerSecond - priorAverageKeysPerSecond); - } - - priorNumKeys = numKeys; - priorCumulativeOpTime = totalOpTime; - priorAverageKeysPerSecond = (int) averageKeysPerSecond; - } + isProgressReporterRunning = true; + try { + long startTime = System.currentTimeMillis(); + long priorNumKeys = 0; + long priorCumulativeOpTime = 0; + int priorAverageKeysPerSecond = 0; + long time = System.currentTimeMillis() - startTime; + // Give other threads time to start. Threads.sleep(REPORTING_INTERVAL_MS); + + while (numThreadsWorking.get() != 0) { + String threadsLeft = + "[" + reporterId + ":" + numThreadsWorking.get() + "] "; + if (numKeys.get() == 0) { + LOG.info(threadsLeft + "Number of keys = 0"); + } else { + long numKeys = MultiThreadedAction.this.numKeys.get(); + time = System.currentTimeMillis() - startTime; + long totalOpTime = totalOpTimeMs.get(); + + long numKeysDelta = numKeys - priorNumKeys; + long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime; + + double averageKeysPerSecond = + (time > 0) ? (numKeys * 1000 / time) : 0; + + LOG.info(threadsLeft + + getOverallRunInformation(numKeys, time, totalOpTime) + + ((numKeysDelta > 0) ? (" Current: [" + "keys/s=" + + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency=" + + String.format("%.2f", (double) totalOpTimeDelta / (double) numKeysDelta) + + " ms]") : "") + + progressInfo()); + + if (streamingCounters) { + printStreamingCounters(numKeysDelta, + averageKeysPerSecond - priorAverageKeysPerSecond); + } + priorNumKeys = numKeys; + priorCumulativeOpTime = totalOpTime; + priorAverageKeysPerSecond = (int) averageKeysPerSecond; + + } + Threads.sleep(REPORTING_INTERVAL_MS); + } + LOG.info(getOverallRunInformation(priorNumKeys, time, priorCumulativeOpTime)); + isProgressReporterRunning = false; + } catch (Exception ex) { + isProgressReporterRunning = false; + LOG.error("ProgressReporterThread exited early: " + ex); } } @@ -248,6 +248,20 @@ public abstract class MultiThreadedAction { System.err.println("reporter:counter:avgKeysPerSecond," + reporterId + "," + (long) (avgKeysPerSecondDelta)); } + + private String getOverallRunInformation(long numKeys, long time, long totalOpTime) { + String overallSummary = "Keys=" + + numKeys + + ", cols=" + + StringUtils.humanReadableInt(numCols.get()) + + ", time=" + + formatTime(time) + + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= " + + numKeys * 1000 / time + ", latency=" + + String.format("%.2f", (double)totalOpTime / (double)numKeys) + + " ms]") : ""); + return (numThreadsWorking.get() == 0) ? "RUN SUMMARY: " + reporterId + " " + overallSummary: overallSummary; + } } public void close() { @@ -261,7 +275,7 @@ public abstract class MultiThreadedAction { } public void waitForFinish() { - while (numThreadsWorking.get() != 0) { + while (isProgressReporterRunning) { Threads.sleepWithoutInterrupt(1000); } close();