commit cf2cef87c2686b191c7f561bde76a7e97f2ec37e Author: Bharath Krishna Date: Tue May 22 20:02:30 2018 -0700 HIVE-19602 : Refactor inplace progress code in Hive-on-spark progress monitor to use ProgressMonitor instance diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 7afd8864075aa0d9708274eea8839c662324c732..3518c55fdb6c572f83ca30cc060a6ddbfffd2ac6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status; +import org.apache.hadoop.hive.common.log.ProgressMonitor; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.common.log.InPlaceUpdate; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -48,6 +49,7 @@ protected final PerfLogger perfLogger = SessionState.getPerfLogger(); protected final int checkInterval = 1000; protected final long monitorTimeoutInterval; + private final InPlaceUpdate inPlaceUpdateFn; private final Set completed = new HashSet(); private final int printInterval = 3000; @@ -61,94 +63,21 @@ FINISHED } - // in-place progress update related variables protected final boolean inPlaceUpdate; - private int lines = 0; - private final PrintStream out; - - private static final int COLUMN_1_WIDTH = 16; - private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s "; - private static final String STAGE_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s "; - private static final String HEADER = String.format(HEADER_FORMAT, - "STAGES", "ATTEMPT", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED"); - private static final int SEPARATOR_WIDTH = 86; - private static final String SEPARATOR = new String(new char[SEPARATOR_WIDTH]).replace("\0", "-"); - private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s"; - private static final int progressBarChars = 30; - - private final NumberFormat secondsFormat = new DecimalFormat("#0.00"); protected SparkJobMonitor(HiveConf hiveConf) { monitorTimeoutInterval = hiveConf.getTimeVar( HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS); inPlaceUpdate = InPlaceUpdate.canRenderInPlace(hiveConf) && !SessionState.getConsole().getIsSilent(); console = new SessionState.LogHelper(LOG); - out = SessionState.LogHelper.getInfoStream(); + inPlaceUpdateFn = new InPlaceUpdate(SessionState.LogHelper.getInfoStream()); } public abstract int startMonitor(); private void printStatusInPlace(Map progressMap) { + inPlaceUpdateFn.render(getProgressMonitor(progressMap)); - StringBuilder reportBuffer = new StringBuilder(); - - // Num of total and completed tasks - int sumTotal = 0; - int sumComplete = 0; - - // position the cursor to line 0 - repositionCursor(); - - // header - reprintLine(SEPARATOR); - reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN); - reprintLine(SEPARATOR); - - SortedSet keys = new TreeSet(progressMap.keySet()); - int idx = 0; - final int numKey = keys.size(); - for (String s : keys) { - SparkStageProgress progress = progressMap.get(s); - final int complete = progress.getSucceededTaskCount(); - final int total = progress.getTotalTaskCount(); - final int running = progress.getRunningTaskCount(); - final int failed = progress.getFailedTaskCount(); - sumTotal += total; - sumComplete += complete; - - StageState state = total > 0 ? StageState.PENDING : StageState.FINISHED; - if (complete > 0 || running > 0 || failed > 0) { - if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); - } - if (complete < total) { - state = StageState.RUNNING; - } else { - state = StageState.FINISHED; - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); - completed.add(s); - } - } - - int div = s.indexOf('_'); - String attempt = div > 0 ? s.substring(div + 1) : "-"; - String stageName = "Stage-" + (div > 0 ? s.substring(0, div) : s); - String nameWithProgress = getNameWithProgress(stageName, complete, total); - - final int pending = total - complete - running; - String stageStr = String.format(STAGE_FORMAT, - nameWithProgress, attempt, state, total, complete, running, pending, failed); - reportBuffer.append(stageStr); - if (idx++ != numKey - 1) { - reportBuffer.append("\n"); - } - } - reprintMultiLine(reportBuffer.toString()); - reprintLine(SEPARATOR); - final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal; - String footer = getFooter(numKey, completed.size(), progress, startTime); - reprintLineWithColorAsBold(footer, Ansi.Color.RED); - reprintLine(SEPARATOR); } protected void printStatus(Map progressMap, @@ -293,84 +222,7 @@ private boolean isSameAsPreviousProgress( return true; } - private void repositionCursor() { - if (lines > 0) { - out.print(ansi().cursorUp(lines).toString()); - out.flush(); - lines = 0; - } - } - - private void reprintLine(String line) { - InPlaceUpdate.reprintLine(out, line); - lines++; - } - - private void reprintLineWithColorAsBold(String line, Ansi.Color color) { - out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset() - .toString()); - out.flush(); - lines++; - } - - private String getNameWithProgress(String s, int complete, int total) { - String result = ""; - if (s != null) { - float percent = total == 0 ? 1.0f : (float) complete / (float) total; - // lets use the remaining space in column 1 as progress bar - int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1; - String trimmedVName = s; - - // if the vertex name is longer than column 1 width, trim it down - if (s.length() > COLUMN_1_WIDTH) { - trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2); - result = trimmedVName + ".."; - } else { - result = trimmedVName + " "; - } - - int toFill = (int) (spaceRemaining * percent); - for (int i = 0; i < toFill; i++) { - result += "."; - } - } - return result; - } - - // STAGES: 03/04 [==================>>-----] 86% ELAPSED TIME: 1.71 s - private String getFooter(int keySize, int completedSize, float progress, long startTime) { - String verticesSummary = String.format("STAGES: %02d/%02d", completedSize, keySize); - String progressBar = getInPlaceProgressBar(progress); - final int progressPercent = (int) (progress * 100); - String progressStr = "" + progressPercent + "%"; - float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000; - String elapsedTime = "ELAPSED TIME: " + secondsFormat.format(et) + " s"; - String footer = String.format(FOOTER_FORMAT, - verticesSummary, progressBar, progressStr, elapsedTime); - return footer; - } - - // [==================>>-----] - private String getInPlaceProgressBar(float percent) { - StringBuilder bar = new StringBuilder("["); - int remainingChars = progressBarChars - 4; - int completed = (int) (remainingChars * percent); - int pending = remainingChars - completed; - for (int i = 0; i < completed; i++) { - bar.append("="); - } - bar.append(">>"); - for (int i = 0; i < pending; i++) { - bar.append("-"); - } - bar.append("]"); - return bar.toString(); - } - - private void reprintMultiLine(String line) { - int numLines = line.split("\r\n|\r|\n").length; - out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); - out.flush(); - lines += numLines; + private SparkProgressMonitor getProgressMonitor(Map progressMap) { + return new SparkProgressMonitor(progressMap, startTime); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java new file mode 100644 index 0000000000000000000000000000000000000000..3c0061c5e69c3a9af9ed40acf5db8aa5a3a10d95 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +import org.apache.hadoop.hive.common.log.ProgressMonitor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * This class defines various parts of the progress update bar. + * Progressbar is displayed in hive-cli and typically rendered using InPlaceUpdate. + */ +class SparkProgressMonitor implements ProgressMonitor { + + private Map progressMap; + private long startTime; + private static final int COLUMN_1_WIDTH = 16; + + SparkProgressMonitor(Map progressMap, long startTime) { + this.progressMap = progressMap; + this.startTime = startTime; + } + + @Override + public List headers() { + return Arrays + .asList("STAGES", "ATTEMPT", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", + ""); + } + + @Override + public List> rows() { + List> progressRows = new ArrayList<>(); + SortedSet keys = new TreeSet(progressMap.keySet()); + for (String s : keys) { + SparkStageProgress progress = progressMap.get(s); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + final int running = progress.getRunningTaskCount(); + final int failed = progress.getFailedTaskCount(); + + SparkJobMonitor.StageState state = + total > 0 ? SparkJobMonitor.StageState.PENDING : SparkJobMonitor.StageState.FINISHED; + if (complete > 0 || running > 0 || failed > 0) { + if (complete < total) { + state = SparkJobMonitor.StageState.RUNNING; + } else { + state = SparkJobMonitor.StageState.FINISHED; + } + } + + int div = s.indexOf('_'); + String attempt = div > 0 ? s.substring(div + 1) : "-"; + String stageName = "Stage-" + (div > 0 ? s.substring(0, div) : s); + String nameWithProgress = getNameWithProgress(stageName, complete, total); + final int pending = total - complete - running; + + progressRows.add(Arrays + .asList(nameWithProgress, attempt, state.toString(), String.valueOf(total), + String.valueOf(complete), String.valueOf(running), String.valueOf(pending), + String.valueOf(failed), "")); + } + return progressRows; + } + + @Override + public String footerSummary() { + return String.format("STAGES: %02d/%02d", getCompletedStages(), progressMap.keySet().size()); + } + + @Override + public long startTime() { + return startTime; + } + + @Override + public String executionStatus() { + if (getCompletedStages() == progressMap.keySet().size()) { + return SparkJobMonitor.StageState.FINISHED.toString(); + } else { + return SparkJobMonitor.StageState.RUNNING.toString(); + } + } + + @Override + public double progressedPercentage() { + + SortedSet keys = new TreeSet(progressMap.keySet()); + int sumTotal = 0; + int sumComplete = 0; + for (String s : keys) { + SparkStageProgress progress = progressMap.get(s); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + sumTotal += total; + sumComplete += complete; + } + double progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal; + return progress; + } + + private int getCompletedStages() { + int completed = 0; + SortedSet keys = new TreeSet(progressMap.keySet()); + for (String s : keys) { + SparkStageProgress progress = progressMap.get(s); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + if (total > 0 && complete == total) { + completed++; + } + } + return completed; + } + + private String getNameWithProgress(String s, int complete, int total) { + String result = ""; + if (s != null) { + float percent = total == 0 ? 1.0f : (float) complete / (float) total; + // lets use the remaining space in column 1 as progress bar + int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1; + String trimmedVName = s; + + // if the vertex name is longer than column 1 width, trim it down + if (s.length() > COLUMN_1_WIDTH) { + trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2); + result = trimmedVName + ".."; + } else { + result = trimmedVName + " "; + } + + int toFill = (int) (spaceRemaining * percent); + for (int i = 0; i < toFill; i++) { + result += "."; + } + } + return result; + } +} \ No newline at end of file