diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6f168b5..a674928 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2749,6 +2749,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.tez.exec.inplace.progress", true, "Updates tez job execution progress in-place in the terminal."), + SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true, + "Updates spark job execution progress in-place in the terminal."), TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f, "This is to override the tez setting with the same name"), TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MIN("hive.tez.task.scale.memory.reserve-fraction.min", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java index 5f0352a..b6d128b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -47,7 +47,7 @@ public int startMonitor() { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); - long startTime = System.currentTimeMillis(); + startTime = System.currentTimeMillis(); while (true) { try { @@ -58,7 +58,7 @@ public int startMonitor() { if (state == null) { long timeCount = (System.currentTimeMillis() - startTime)/1000; - if (timeCount > monitorTimeoutInteval) { + if (timeCount > monitorTimeoutInterval) { console.printError("Job hasn't been submitted after " + timeCount + "s. Aborting it."); console.printError("Status: " + state); running = false; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 11f263b..bdb1527 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status; +import java.util.Arrays; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; @@ -52,20 +53,17 @@ public int startMonitor() { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); - long startTime = System.currentTimeMillis(); + startTime = System.currentTimeMillis(); while (true) { try { JobHandle.State state = sparkJobStatus.getRemoteJobState(); - if (LOG.isDebugEnabled()) { - console.printInfo("state = " + state); - } switch (state) { case SENT: case QUEUED: long timeCount = (System.currentTimeMillis() - startTime) / 1000; - if ((timeCount > monitorTimeoutInteval)) { + if ((timeCount > monitorTimeoutInterval)) { console.printError("Job hasn't been submitted after " + timeCount + "s." + " Aborting it.\nPossible reasons include network issues, " + "errors in remote driver or the cluster has no available resources, etc.\n" + @@ -75,6 +73,9 @@ public int startMonitor() { done = true; rc = 2; } + if (LOG.isDebugEnabled()) { + console.printInfo("state = " + state); + } break; case STARTED: JobExecutionStatus sparkJobState = sparkJobStatus.getState(); @@ -84,18 +85,20 @@ public int startMonitor() { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); printAppInfo(); // print job stages. - console.printInfo("\nQuery Hive on Spark job[" - + sparkJobStatus.getJobId() + "] stages:"); - for (int stageId : sparkJobStatus.getStageIds()) { - console.printInfo(Integer.toString(stageId)); - } + console.printInfo("\nQuery Hive on Spark job[" + sparkJobStatus.getJobId() + + "] stages: " + Arrays.toString(sparkJobStatus.getStageIds())); console.printInfo("\nStatus: Running (Hive on Spark job[" + sparkJobStatus.getJobId() + "])"); running = true; - console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: " - + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]"); + String format = "Job Progress Format\nCurrentTime StageId_StageAttemptId: " + + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount"; + if (!inPlaceUpdate) { + console.printInfo(format); + } else { + console.logInfo(format); + } } printStatus(progressMap, lastProgressMap); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 0b6b15b..37dd3c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -19,11 +19,16 @@ package org.apache.hadoop.hive.ql.exec.spark.status; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.SessionState; +import org.fusesource.jansi.Ansi; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.PrintStream; +import java.text.DecimalFormat; +import java.text.NumberFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashSet; @@ -33,35 +38,142 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import static org.fusesource.jansi.Ansi.ansi; + abstract class SparkJobMonitor { protected static final String CLASS_NAME = SparkJobMonitor.class.getName(); protected static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - protected static SessionState.LogHelper console = new SessionState.LogHelper(LOG); + protected transient final SessionState.LogHelper console; protected final PerfLogger perfLogger = SessionState.getPerfLogger(); protected final int checkInterval = 1000; - protected final long monitorTimeoutInteval; + protected final long monitorTimeoutInterval; private final Set completed = new HashSet(); private final int printInterval = 3000; private long lastPrintTime; + protected long startTime; + + protected enum StageState { + PENDING, + RUNNING, + FINISHED + } + + // in-place progress update related variables + protected final boolean inPlaceUpdate; + private int lines = 0; + private final PrintStream out; + + + private static final int COLUMN_1_WIDTH = 16; + private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s "; + private static final String STAGE_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s "; + private static final String HEADER = String.format(HEADER_FORMAT, + "STAGES", "ATTEMPT", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED"); + private static final int SEPARATOR_WIDTH = 86; + private static final String SEPARATOR = new String(new char[SEPARATOR_WIDTH]).replace("\0", "-"); + private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s"; + private static final int progressBarChars = 30; + + private final NumberFormat secondsFormat = new DecimalFormat("#0.00"); + protected SparkJobMonitor(HiveConf hiveConf) { - monitorTimeoutInteval = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS); + monitorTimeoutInterval = hiveConf.getTimeVar( + HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS); + inPlaceUpdate = InPlaceUpdates.inPlaceEligible(hiveConf); + console = SessionState.getConsole(); + out = SessionState.LogHelper.getInfoStream(); } public abstract int startMonitor(); + private void printStatusInPlace(Map progressMap) { + + StringBuilder reportBuffer = new StringBuilder(); + + // Num of total and completed tasks + int sumTotal = 0; + int sumComplete = 0; + + // position the cursor to line 0 + repositionCursor(); + + // header + reprintLine(SEPARATOR); + reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN); + reprintLine(SEPARATOR); + + SortedSet keys = new TreeSet(progressMap.keySet()); + int idx = 0; + final int numKey = keys.size(); + for (String s : keys) { + SparkStageProgress progress = progressMap.get(s); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + final int running = progress.getRunningTaskCount(); + final int failed = progress.getFailedTaskCount(); + sumTotal += total; + sumComplete += complete; + + StageState state = total > 0 ? StageState.PENDING : StageState.FINISHED; + if (complete > 0 || running > 0 || failed > 0) { + if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); + } + if (complete < total) { + state = StageState.RUNNING; + } else { + state = StageState.FINISHED; + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); + completed.add(s); + } + } + + int div = s.indexOf('_'); + String attempt = div > 0 ? s.substring(div + 1) : "-"; + String stageName = "Stage-" + (div > 0 ? s.substring(0, div) : s); + String nameWithProgress = getNameWithProgress(stageName, complete, total); + + final int pending = total - complete - running; + String stageStr = String.format(STAGE_FORMAT, + nameWithProgress, attempt, state, total, complete, running, pending, failed); + reportBuffer.append(stageStr); + if (idx++ != numKey - 1) { + reportBuffer.append("\n"); + } + } + reprintMultiLine(reportBuffer.toString()); + reprintLine(SEPARATOR); + final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal; + String footer = getFooter(numKey, completed.size(), progress, startTime); + reprintLineWithColorAsBold(footer, Ansi.Color.RED); + reprintLine(SEPARATOR); + } + protected void printStatus(Map progressMap, - Map lastProgressMap) { + Map lastProgressMap) { // do not print duplicate status while still in middle of print interval. boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap); - boolean isPassedInterval = System.currentTimeMillis() <= lastPrintTime + printInterval; - if (isDuplicateState && isPassedInterval) { + boolean withinInterval = System.currentTimeMillis() <= lastPrintTime + printInterval; + if (isDuplicateState && withinInterval) { return; } + String report = getReport(progressMap); + if (inPlaceUpdate) { + printStatusInPlace(progressMap); + console.logInfo(report); + } else { + console.printInfo(report); + } + + lastPrintTime = System.currentTimeMillis(); + } + + private String getReport(Map progressMap) { StringBuilder reportBuffer = new StringBuilder(); SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); String currentDate = dt.format(new Date()); @@ -82,9 +194,9 @@ protected void printStatus(Map progressMap, completed.add(s); if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); } - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s); } if (complete < total && (complete > 0 || running > 0 || failed > 0)) { /* stage is started, but not complete */ @@ -93,24 +205,24 @@ protected void printStatus(Map progressMap, } if (failed > 0) { reportBuffer.append( - String.format( - "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed, total)); + String.format( + "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed, total)); } else { reportBuffer.append( - String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total)); + String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total)); } } else { /* stage is waiting for input/slots or complete */ if (failed > 0) { /* tasks finished but some failed */ reportBuffer.append( - String.format( - "%s: %d(-%d)/%d Finished with failed tasks\t", - stageName, complete, failed, total)); + String.format( + "%s: %d(-%d)/%d Finished with failed tasks\t", + stageName, complete, failed, total)); } else { if (complete == total) { reportBuffer.append( - String.format("%s: %d/%d Finished\t", stageName, complete, total)); + String.format("%s: %d/%d Finished\t", stageName, complete, total)); } else { reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total)); } @@ -118,14 +230,12 @@ protected void printStatus(Map progressMap, } } } - - lastPrintTime = System.currentTimeMillis(); - console.printInfo(reportBuffer.toString()); + return reportBuffer.toString(); } private boolean isSameAsPreviousProgress( - Map progressMap, - Map lastProgressMap) { + Map progressMap, + Map lastProgressMap) { if (lastProgressMap == null) { return false; @@ -142,7 +252,7 @@ private boolean isSameAsPreviousProgress( } for (String key : progressMap.keySet()) { if (!lastProgressMap.containsKey(key) - || !progressMap.get(key).equals(lastProgressMap.get(key))) { + || !progressMap.get(key).equals(lastProgressMap.get(key))) { return false; } } @@ -150,4 +260,85 @@ private boolean isSameAsPreviousProgress( } return true; } + + private void repositionCursor() { + if (lines > 0) { + out.print(ansi().cursorUp(lines).toString()); + out.flush(); + lines = 0; + } + } + + private void reprintLine(String line) { + InPlaceUpdates.reprintLine(out, line); + lines++; + } + + private void reprintLineWithColorAsBold(String line, Ansi.Color color) { + out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset() + .toString()); + out.flush(); + lines++; + } + + private String getNameWithProgress(String s, int complete, int total) { + String result = ""; + if (s != null) { + float percent = total == 0 ? 1.0f : (float) complete / (float) total; + // lets use the remaining space in column 1 as progress bar + int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1; + String trimmedVName = s; + + // if the vertex name is longer than column 1 width, trim it down + if (s.length() > COLUMN_1_WIDTH) { + trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2); + result = trimmedVName + ".."; + } else { + result = trimmedVName + " "; + } + + int toFill = (int) (spaceRemaining * percent); + for (int i = 0; i < toFill; i++) { + result += "."; + } + } + return result; + } + + // STAGES: 03/04 [==================>>-----] 86% ELAPSED TIME: 1.71 s + private String getFooter(int keySize, int completedSize, float progress, long startTime) { + String verticesSummary = String.format("STAGES: %02d/%02d", completedSize, keySize); + String progressBar = getInPlaceProgressBar(progress); + final int progressPercent = (int) (progress * 100); + String progressStr = "" + progressPercent + "%"; + float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000; + String elapsedTime = "ELAPSED TIME: " + secondsFormat.format(et) + " s"; + String footer = String.format(FOOTER_FORMAT, + verticesSummary, progressBar, progressStr, elapsedTime); + return footer; + } + + // [==================>>-----] + private String getInPlaceProgressBar(float percent) { + StringBuilder bar = new StringBuilder("["); + int remainingChars = progressBarChars - 4; + int completed = (int) (remainingChars * percent); + int pending = remainingChars - completed; + for (int i = 0; i < completed; i++) { + bar.append("="); + } + bar.append(">>"); + for (int i = 0; i < pending; i++) { + bar.append("-"); + } + bar.append("]"); + return bar.toString(); + } + + private void reprintMultiLine(String line) { + int numLines = line.split("\r\n|\r|\n").length; + out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); + out.flush(); + lines += numLines; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java index 5b2d4e2..9879f93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InPlaceUpdates.java @@ -63,11 +63,18 @@ static boolean isUnixTerminal() { } public static boolean inPlaceEligible(HiveConf conf) { - boolean inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS); + String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + boolean inPlaceUpdates = false; + if (engine.equals("tez")) { + inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS); + } + if (engine.equals("spark")) { + inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.SPARK_EXEC_INPLACE_PROGRESS); + } // we need at least 80 chars wide terminal to display in-place updates properly return inPlaceUpdates && !SessionState.getConsole().getIsSilent() && isUnixTerminal() - && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH; + && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH; } public static void reprintLine(PrintStream out, String line) {