diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 96e6e61..c74df78 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1912,7 +1912,15 @@ TEZ_SMB_NUMBER_WAVES( "hive.tez.smb.number.waves", (float) 0.5, - "The number of waves in which to run the SMB join. Account for cluster being occupied. Ideally should be 1 wave.") + "The number of waves in which to run the SMB join. Account for cluster being occupied. Ideally should be 1 wave."), + TEZ_EXEC_SUMMARY( + "hive.tez.exec.print.summary", + false, + "Display breakdown of execution steps, for every query executed by the shell."), + TEZ_PROGRESS_IN_PLACE_UPDATES( + "hive.tez.progress.update.inplace", + true, + "Updates tez job execution progress in-place in the terminal.") ; public final String varname; diff --git pom.xml pom.xml index bd74830..a5f851f 100644 --- pom.xml +++ pom.xml @@ -129,6 +129,7 @@ 7.6.0.v20120127 1.14 0.9.94 + 1.11 1.1 3.5.2 20090211 @@ -336,6 +337,11 @@ ${jline.version} + org.fusesource.jansi + jansi + ${jansi.version} + + junit junit ${junit.version} diff --git ql/pom.xml ql/pom.xml index c373431..c3f1721 100644 --- ql/pom.xml +++ ql/pom.xml @@ -274,6 +274,16 @@ test + jline + jline + ${jline.version} + + + org.fusesource.jansi + jansi + ${jansi.version} + + org.apache.tez tez-api ${tez.version} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 9bcdeed..6c5daab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -19,9 +19,15 @@ package org.apache.hadoop.hive.ql.exec.tez; import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; +import static org.fusesource.jansi.Ansi.ansi; +import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO; +import static org.fusesource.jansi.internal.CLibrary.isatty; import java.io.IOException; +import java.io.PrintStream; +import java.text.DecimalFormat; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -33,15 +39,29 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Heartbeater; +import org.apache.hadoop.hive.ql.exec.MapOperator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapred.JobConf; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.Progress; import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.api.client.VertexStatus; +import org.fusesource.jansi.Ansi; + +import jline.Terminal; /** * TezJobMonitor keeps track of a tez job while it's being executed. It will @@ -50,16 +70,54 @@ */ public class TezJobMonitor { - private static final Log LOG = LogFactory.getLog(TezJobMonitor.class.getName()); private static final String CLASS_NAME = TezJobMonitor.class.getName(); + public static final boolean UNIX_TERMINAL = detectUnixTerminal(); + private static final int MIN_TERMINAL_WIDTH = 80; + private static final int COLUMN_1_WIDTH = 16; + private static final String HEADER = String.format("%16s %10s %5s %9s %7s %7s %6s %6s", + "VERTICES", + "STATUS", + "TOTAL", + "COMPLETED", + "RUNNING", + "PENDING", + "FAILED", + "KILLED"); + private static final String SEPARATOR = "-------------------------------------------------------------------------------"; + private static final String TOTAL_PREP_TIME = "TotalPrepTime"; + private static final String METHOD = "METHOD"; + private static final String DURATION = "DURATION_MS"; + private static final String VERTICES = "VERTICES"; + private static final String VERTEX_NAME = "VERTEX"; + private static final String TOTAL_TASKS = "TOTAL_TASKS"; + private static final String FAILED_TASKS = "FAILED_TASKS"; + private static final String KILLED_TASKS = "KILLED_TASKS"; + private static final String ELAPSED_TIME = "DURATION_SECONDS"; + private static final String CPU_TIME = "CPU_TIME_MILLIS"; + private static final String GC_TIME = "GC_TIME_MILLIS"; + private static final String INPUT_RECORDS = "INPUT_RECORDS"; + private static final String OUTPUT_RECORDS = "OUTPUT_RECORDS"; + + // in-place progress update related variables + private int lines; + private PrintStream out; private transient LogHelper console; private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private final int checkInterval = 200; private final int maxRetryInterval = 2500; private final int printInterval = 3000; + private final int progressBarChars = 25; + private final int progressPercentChars = 3; + private final int progressDevisor = 100 / Math.min(100, progressBarChars); private long lastPrintTime; private Set completed; + + /* Pretty print the values */ + private final DecimalFormat msDurationFormatter; + private final DecimalFormat secondDurationFormatter; + private final DecimalFormat decimalFormatter; + private static final List shutdownList; static { @@ -83,20 +141,112 @@ public void run() { } public TezJobMonitor() { - console = new LogHelper(LOG); + console = SessionState.getConsole(); + msDurationFormatter = new DecimalFormat("###,###,###,###,###,###"); + secondDurationFormatter = new DecimalFormat("###,###,###,###,###,###0.00"); + decimalFormatter = new DecimalFormat("###,###,###,###,###,###,###"); + // all progress updates are written to info stream and log file. In-place updates can only be + // done to info stream (console) + out = console.getInfoStream(); + } + + private static boolean detectUnixTerminal() { + + String os = System.getProperty("os.name"); + if (os.startsWith("Windows")) { + // we do not support Windows, we will revisit this if we really need it for windows. + return false; + } + + // We must be on some unix variant.. + // check if standard out is a terminal + try { + // isatty system call will return 1 if the file descriptor is terminal else 0 + if (isatty(STDOUT_FILENO) == 0) { + return false; + } + } catch (NoClassDefFoundError ignore) { + // These errors happen if the JNI lib is not available for your platform. + return false; + } catch (UnsatisfiedLinkError ignore) { + // These errors happen if the JNI lib is not available for your platform. + return false; + } + return true; + } + + public boolean isUnixTerminal() { + return UNIX_TERMINAL; + } + + /** + * NOTE: Use this method only if isUnixTerminal is true. + * Erases the current line and prints the given line. + * @param line - line to print + */ + public void reprintLine(String line) { + out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); + out.flush(); + lines++; } /** - * monitorExecution handles status printing, failures during execution and final - * status retrieval. + * NOTE: Use this method only if isUnixTerminal is true. + * Erases the current line and prints the given line with the specified color. + * @param line - line to print + * @param color - color for the line + */ + public void reprintLineWithColorAsBold(String line, Ansi.Color color) { + out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset() + .toString()); + out.flush(); + lines++; + } + + /** + * NOTE: Use this method only if isUnixTerminal is true. + * Erases the current line and prints the given multiline. Make sure the specified line is not + * terminated by linebreak. + * @param line - line to print + */ + public void reprintMultiLine(String line) { + int numLines = line.split("\r\n|\r|\n").length; + out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); + out.flush(); + lines += numLines; + } + + /** + * NOTE: Use this method only if isUnixTerminal is true. + * Repositions the cursor back to line 0. + */ + public void repositionCursor() { + if (lines > 0) { + out.print(ansi().cursorUp(lines).toString()); + out.flush(); + lines = 0; + } + } + + /** + * NOTE: Use this method only if isUnixTerminal is true. + * Gets the width of the terminal + * @return - width of terminal + */ + public int getTerminalWidth() { + return Terminal.getTerminal().getTerminalWidth(); + } + + /** + * monitorExecution handles status printing, failures during execution and final status retrieval. * * @param dagClient client that was used to kick off the job * @param txnMgr transaction manager for this operation * @param conf configuration file for this operation * @return int 0 - success, 1 - killed, 2 - failed */ - public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, - HiveConf conf) throws InterruptedException { + public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, HiveConf conf, + JobConf jobConf, DAG dag) throws InterruptedException { DAGStatus status = null; completed = new HashSet(); @@ -109,6 +259,14 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, Set opts = new HashSet(); Heartbeater heartbeater = new Heartbeater(txnMgr, conf); long startTime = 0; + boolean isProfileEnabled = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY); + boolean inPlaceUpdates = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_PROGRESS_IN_PLACE_UPDATES); + boolean wideTerminal = false; + if (isUnixTerminal()) { + if (getTerminalWidth() >= MIN_TERMINAL_WIDTH) { + wideTerminal = true; + } + } shutdownList.add(dagClient); @@ -116,7 +274,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); - while(true) { + while (true) { try { status = dagClient.getDAGStatus(opts); @@ -127,7 +285,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, if (state != lastState || state == RUNNING) { lastState = state; - switch(state) { + switch (state) { case SUBMITTED: console.printInfo("Status: Submitted"); break; @@ -138,19 +296,36 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n"); - for (String s: progressMap.keySet()) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); - } startTime = System.currentTimeMillis(); running = true; } - lastReport = printStatus(progressMap, lastReport, console); + if (inPlaceUpdates && isUnixTerminal() && wideTerminal && !console.getIsSilent()) { + printStatusInPlace(progressMap, console, startTime); + } else { + lastReport = printStatus(progressMap, lastReport, console); + } break; case SUCCEEDED: - lastReport = printStatus(progressMap, lastReport, console); - double duration = (System.currentTimeMillis() - startTime)/1000.0; - console.printInfo("Status: Finished successfully in " + String.format("%.2f seconds", duration)); + if (inPlaceUpdates && isUnixTerminal() && wideTerminal && !console.getIsSilent()) { + printStatusInPlace(progressMap, console, startTime); + } else { + lastReport = printStatus(progressMap, lastReport, console); + } + + /* Profile info is collected anyways, isProfileEnabled + * decides if it gets printed or not + */ + if (isProfileEnabled) { + + double duration = (System.currentTimeMillis() - startTime) / 1000.0; + console.printInfo("Status: DAG finished successfully in " + + String.format("%.2f seconds", duration)); + console.printInfo("\n"); + + printMethodsSummary(); + printDagSummary(progressMap, console, dagClient, conf, jobConf, dag); + } running = false; done = true; break; @@ -173,15 +348,15 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, Thread.sleep(checkInterval); } } catch (Exception e) { - console.printInfo("Exception: "+e.getMessage()); - if (++failedCounter % maxRetryInterval/checkInterval == 0 + console.printInfo("Exception: " + e.getMessage()); + if (++failedCounter % maxRetryInterval / checkInterval == 0 || e instanceof InterruptedException) { try { console.printInfo("Killing DAG..."); dagClient.tryKillDAG(); - } catch(IOException io) { + } catch (IOException io) { // best effort - } catch(TezException te) { + } catch (TezException te) { // best effort } e.printStackTrace(); @@ -194,7 +369,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, } finally { if (done) { if (rc != 0 && status != null) { - for (String diag: status.getDiagnostics()) { + for (String diag : status.getDiagnostics()) { console.printError(diag); } } @@ -222,42 +397,454 @@ public static void killRunningJobs() { } } + private String getProgressBar(int percent) { + StringBuilder bar = new StringBuilder("["); + String percentageString = String.format("%1$" + progressPercentChars + "s", percent) + "%"; + int reservedCenterChars = percentageString.length() + 2; + + int startPrintRange = (progressBarChars - reservedCenterChars) / 2; + int endPrintRange = (progressBarChars + reservedCenterChars) / 2 - 1; + + for (int i = 0; i < progressBarChars; i++) { + + if ((i < (percent / progressDevisor) && ((i < startPrintRange || i > endPrintRange)))) { + bar.append("="); + } else if (i == (progressBarChars - reservedCenterChars) / 2) { + bar.append(percentageString); + i += percentageString.length(); + } else if (i == (percent / progressDevisor) && ((i < startPrintRange || i > endPrintRange))) { + bar.append(">"); + } else { + bar.append(" "); + } + } + + bar.append("] "); + return bar.toString(); + } + + private String getAggregatedVerticesStatus(int verticesCount, int completedVertices, int runningVertices) { + + StringBuilder reportBuffer = new StringBuilder(); + + if (runningVertices > 0) { + reportBuffer.append(String.format("%s: %d(+%d)/%d\t", VERTICES, completedVertices, + runningVertices, verticesCount)); + } else { + reportBuffer.append(String + .format("%s: %d/%d\t", VERTICES, completedVertices, verticesCount)); + } + + return reportBuffer.toString(); + } + + private static long getCounterValueByGroupName(TezCounters vertexCounters, String groupNamePattern, + String counterName) { + + TezCounter tezCounter; + long counterValue = 0; + + for (String groupName : vertexCounters.getGroupNames()) { + if (groupName.equals(groupNamePattern)) { + tezCounter = vertexCounters.getGroup(groupName).findCounter(counterName); + counterValue += (tezCounter != null) ? tezCounter.getValue() : 0; + } + } + + return counterValue; + } + + private void printMethodsSummary() { + double totalInPrepTime = 0; + + String[] perfLoggerReportMethods = { + (PerfLogger.PARSE), + (PerfLogger.ANALYZE), + (PerfLogger.TEZ_BUILD_DAG), + (PerfLogger.TEZ_SUBMIT_TO_RUNNING) + }; + + /* Build the method summary header */ + String methodBreakdownHeader = String.format("%-30s %-13s", METHOD, DURATION); + console.printInfo(methodBreakdownHeader); + + for (String method : perfLoggerReportMethods) { + long duration = perfLogger.getDuration(method); + totalInPrepTime += duration; + console.printInfo(String.format("%-30s %11s", method, msDurationFormatter.format(duration))); + } + + /* + * The counters list above don't capture the total time from TimeToSubmit.startTime till + * TezRunDag.startTime, so calculate the duration and print it. + */ + totalInPrepTime = + perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) + - perfLogger.getStartTime(PerfLogger.TIME_TO_SUBMIT); + + console.printInfo(String.format("%-30s %11s", TOTAL_PREP_TIME, + msDurationFormatter.format(totalInPrepTime))); + + console.printInfo("\n"); + } + + private void printDagSummary(Map progressMap, LogHelper console, + DAGClient dagClient, HiveConf conf, JobConf jobConf, DAG dag) throws IOException, + TezException { + + VertexStatus vertexStatus; + TezCounters vertexCounters; + TezCounter tezCounter; + double cpuTimeMillis = 0; + double gcTimeMillis = 0; + double hiveInputRecords = 0; + double hiveOutputRecords = 0; + double hiveOutputIntermediateRecords = 0; + double hiveInputRecordsFromOtherVertices = 0; + double duration = 0; + + int totalTasks = 0; + int failedTasks = 0; + int killedTasks = 0; + + /* Strings for headers and counters */ + String hiveCountersGroup = conf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); + String vertexExecutionStats; + + /* Build the header for the per vertex summary */ + String vertexSummaryHeader = + String.format("%-12s %-12s %-12s %-15s %-20s %-15s %-15s %-15s %-15s", VERTEX_NAME, + TOTAL_TASKS, FAILED_TASKS, KILLED_TASKS, ELAPSED_TIME, CPU_TIME, GC_TIME, + INPUT_RECORDS, OUTPUT_RECORDS); + + /* Print the per Vertex summary */ + console.printInfo(vertexSummaryHeader); + + Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); + TezCounters hiveCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters(); + + /* If the counters are missing there is no point trying to print progress */ + if (hiveCounters == null) { + return; + } + + SortedSet keys = new TreeSet(progressMap.keySet()); + + Set statusOptions = new HashSet(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + + for (String vertexName : keys) { + Progress progress = progressMap.get(vertexName); + + /* Be extra careful not to hit NPE */ + if (progress == null) { + continue; + } + + totalTasks = progress.getTotalTaskCount(); + failedTasks = progress.getFailedTaskCount(); + killedTasks = progress.getKilledTaskCount(); + hiveInputRecordsFromOtherVertices = 0; + + duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName) / 1000.0; + + vertexStatus = dagClient.getVertexStatus(vertexName, statusOptions); + + if (vertexStatus == null) { + continue; + } + + Vertex currentVertex = dag.getVertex(vertexName); + List inputVerticesList = currentVertex.getInputVertices(); + if (inputVerticesList.size() > 0) { + + for (Vertex inputVertex : inputVerticesList) { + + String inputVertexName = inputVertex.getName(); + + hiveInputRecordsFromOtherVertices += + getCounterValueByGroupName( + hiveCounters, + hiveCountersGroup, + String.format("%s_", + ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()) + + inputVertexName.replace(" ", "_")); + + hiveInputRecordsFromOtherVertices += + getCounterValueByGroupName(hiveCounters, hiveCountersGroup, + String.format("%s_", FileSinkOperator.Counter.RECORDS_OUT.toString()) + + inputVertexName.replace(" ", "_")); + } + } + + /* + * Get the CPU & GC + * + * counters org.apache.tez.common.counters.TaskCounter + * GC_TIME_MILLIS=37712 + * CPU_MILLISECONDS=2774230 + */ + vertexCounters = vertexStatus.getVertexCounters(); + + cpuTimeMillis = + getCounterValueByGroupName(vertexCounters, TaskCounter.class.getName(), + TaskCounter.CPU_MILLISECONDS.name()); + + gcTimeMillis = + getCounterValueByGroupName(vertexCounters, TaskCounter.class.getName(), + TaskCounter.GC_TIME_MILLIS.name()); + + /* + * Get the HIVE counters + * + * HIVE + * CREATED_FILES=1 + * DESERIALIZE_ERRORS=0 + * RECORDS_IN_Map_1=550076554 + * RECORDS_OUT_INTERMEDIATE_Map_1=854987 + * RECORDS_OUT_Reducer_2=1 + */ + hiveInputRecords = + getCounterValueByGroupName( + hiveCounters, + hiveCountersGroup, + String.format("%s_", MapOperator.Counter.RECORDS_IN.toString()) + + vertexName.replace(" ", "_")); + + hiveInputRecords += hiveInputRecordsFromOtherVertices; + + hiveOutputRecords = + getCounterValueByGroupName( + hiveCounters, + hiveCountersGroup, + String.format("%s_", FileSinkOperator.Counter.RECORDS_OUT.toString()) + + vertexName.replace(" ", "_")); + + hiveOutputIntermediateRecords = + getCounterValueByGroupName(hiveCounters, hiveCountersGroup, + String.format("%s_", ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()) + + vertexName.replace(" ", "_")); + + hiveOutputRecords += hiveOutputIntermediateRecords; + + vertexExecutionStats = + String.format("%-12s %11s %13s %12s %19s %19s %13s %15s %16s", vertexName, + Long.toString(totalTasks), Long.toString(failedTasks), Long.toString(killedTasks), + secondDurationFormatter.format((duration)), decimalFormatter.format(cpuTimeMillis), + decimalFormatter.format(gcTimeMillis), decimalFormatter.format(hiveInputRecords), + decimalFormatter.format(hiveOutputRecords)); + + console.printInfo(vertexExecutionStats); + } + } + + private void printStatusInPlace(Map progressMap, LogHelper console, + long startTime) { + StringBuffer reportBuffer = new StringBuffer(); + int sumComplete = 0; + int sumTotal = 0; + + // position the cursor to line 0 + repositionCursor(); + + // print header + // ------------------------------------------------------------------------------- + // VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED + // ------------------------------------------------------------------------------- + reprintLine(SEPARATOR); + reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN); + reprintLine(SEPARATOR); + + SortedSet keys = new TreeSet(progressMap.keySet()); + int idx = 0; + int maxKeys = keys.size(); + for (String s : keys) { + idx++; + Progress progress = progressMap.get(s); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + final int running = progress.getRunningTaskCount(); + final int failed = progress.getFailedTaskCount(); + final int pending = progress.getTotalTaskCount() - progress.getSucceededTaskCount() - + progress.getRunningTaskCount(); + final int killed = progress.getKilledTaskCount(); + + // we don't really have state information for vertex. Let's mimic the vertex state using the + // available task counts + DAGStatus.State vertexState = DAGStatus.State.SUBMITTED; + sumComplete += complete; + sumTotal += total; + + if (total > 0) { + if (complete == total) { + vertexState = DAGStatus.State.SUCCEEDED; + if (!completed.contains(s)) { + completed.add(s); + + /* We may have missed the start of the vertex + * due to the 3 seconds interval + */ + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + } + + if (complete < total && (complete > 0 || running > 0 || failed > 0)) { + vertexState = DAGStatus.State.RUNNING; + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + } + // Map 1 .......... SUCCEEDED 7 7 0 0 0 0 + String nameWithProgress = getNameWithProgress(s, complete, total); + String vertexStr = String.format("%-16s %10s %5s %9s %7s %7s %6s %6s", + nameWithProgress, + vertexState.toString(), + total, + complete, + running, + pending, + failed, + killed); + reportBuffer.append(vertexStr); + if (idx != maxKeys) { + reportBuffer.append("\n"); + } + } + } + + reprintMultiLine(reportBuffer.toString()); + + // ------------------------------------------------------------------------------- + // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s + // ------------------------------------------------------------------------------- + reprintLine(SEPARATOR); + final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal; + String footer = getFooter(keys.size(), completed.size(), progress, startTime); + reprintLineWithColorAsBold(footer, Ansi.Color.RED); + reprintLine(SEPARATOR); + } + + // Map 1 .......... + private String getNameWithProgress(String s, int complete, int total) { + float percent = total == 0 ? 0.0f : (float) complete / (float) total; + // lets use the remaining space in column 1 as progress bar + int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1; + String result = s + " "; + int toFill = (int) (spaceRemaining * percent); + for (int i = 0; i < toFill; i++) { + result += "."; + } + return result; + } + + // VERTICES: 03/04 [==================>>-----] 86% ELAPSED TIME: 1.71 s + private String getFooter(int keySize, int completedSize, float progress, long startTime) { + String verticesSummary = String.format("VERTICES: %02d/%02d", completedSize, keySize); + String progressBar = getInPlaceProgressBar(progress); + final int progressPercent = (int) (progress * 100); + String progressStr = "" + progressPercent + "%"; + float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000; + String elapsedTime = "ELAPSED TIME: " + secondDurationFormatter.format(et) + " s"; + String footer = String.format("%-15s %-25s %-4s %-25s", + verticesSummary, progressBar, progressStr, elapsedTime); + return footer; + } + + // [==================>>-----] + private String getInPlaceProgressBar(float percent) { + StringBuilder bar = new StringBuilder("["); + int remainingChars = progressBarChars - 4; + int completed = (int) (remainingChars * percent); + int pending = remainingChars - completed; + for (int i = 0; i < completed; i++) { + bar.append("="); + } + bar.append(">>"); + for (int i = 0; i < pending; i++) { + bar.append("-"); + } + bar.append("]"); + return bar.toString(); + } + private String printStatus(Map progressMap, String lastReport, LogHelper console) { StringBuffer reportBuffer = new StringBuffer(); + StringBuffer runningVertices = new StringBuffer(); + int progressPercentage = 0; + int sumComplete = 0; + int sumTotal = 0; + + int runningVerticesCount = 0; SortedSet keys = new TreeSet(progressMap.keySet()); - for (String s: keys) { + for (String s : keys) { Progress progress = progressMap.get(s); final int complete = progress.getSucceededTaskCount(); final int total = progress.getTotalTaskCount(); final int running = progress.getRunningTaskCount(); final int failed = progress.getFailedTaskCount(); - if (total <= 0) { - reportBuffer.append(String.format("%s: -/-\t", s, complete, total)); - } else { + sumComplete += complete; + sumTotal += total; + + if (total > 0) { if (complete == total && !completed.contains(s)) { + completed.add(s); + + /* We may have missed the start of the vertex + * due to the 3 seconds interval + */ + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); } - if(complete < total && (complete > 0 || running > 0 || failed > 0)) { + + if (complete < total && (complete > 0 || running > 0 || failed > 0)) { + + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + + runningVerticesCount++; + /* vertex is started, but not complete */ if (failed > 0) { - reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total)); + runningVertices.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, + failed, total)); } else { - reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total)); + runningVertices.append(String.format(String.format("%s: %d(+%d)/%d\t", s, complete, + running, total))); } + } else { - /* vertex is waiting for input/slots or complete */ + if (failed > 0) { /* tasks finished but some failed */ - reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total)); - } else { - reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total)); + runningVertices.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total)); } } } } + progressPercentage = (sumTotal == 0) ? 0 : 100 * sumComplete / sumTotal; + + String verticesSummary = + getAggregatedVerticesStatus(keys.size(), completed.size(), runningVerticesCount); + + String progressBar = getProgressBar(progressPercentage); + + reportBuffer.append(String.format("%12s %12s ", progressBar, verticesSummary)); + + if (runningVertices.length() > 0) { + reportBuffer.append(String.format("%s: ", "RUNNING")).append(runningVertices); + } + String report = reportBuffer.toString(); if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) { console.printInfo(report); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 337f2f4..ac4903a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -162,7 +162,7 @@ public int execute(DriverContext driverContext) { // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(); - rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf); + rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf, jobConf, dag); // fetch the counters Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); diff --git ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index c4327fc..0325da6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -147,10 +147,37 @@ public void close(Log _log, QueryPlan queryPlan) { } public Long getStartTime(String method) { - return startTimes.get(method); + Long startTime = 0L; + + if (startTimes.containsKey(method)) { + startTime = startTimes.get(method); + } + return startTime; } public Long getEndTime(String method) { - return endTimes.get(method); + Long endTime = 0L; + + if (endTimes.containsKey(method)) { + endTime = endTimes.get(method); + } + return endTime; } + + public boolean startTimeHasMethod(String method) { + return startTimes.containsKey(method); + } + + public boolean endTimeHasMethod(String method) { + return endTimes.containsKey(method); + } + + public Long getDuration(String method) { + long duration = 0; + if (startTimes.containsKey(method) && endTimes.containsKey(method)) { + duration = endTimes.get(method) - startTimes.get(method); + } + return duration; + } + }