diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1e35e44..fe227b0 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1912,7 +1912,15 @@ TEZ_SMB_NUMBER_WAVES( "hive.tez.smb.number.waves", (float) 0.5, - "The number of waves in which to run the SMB join. Account for cluster being occupied. Ideally should be 1 wave.") + "The number of waves in which to run the SMB join. Account for cluster being occupied. Ideally should be 1 wave."), + TEZ_EXEC_SUMMARY( + "hive.tez.exec.print.summary", + false, + "Display breakdown of execution steps, for every query executed by the shell."), + TEZ_EXEC_INPLACE_PROGRESS( + "hive.tez.exec.inplace.progress", + true, + "Updates tez job execution progress in-place in the terminal.") ; public final String varname; diff --git pom.xml pom.xml index bd74830..a5f851f 100644 --- pom.xml +++ pom.xml @@ -129,6 +129,7 @@ 7.6.0.v20120127 1.14 0.9.94 + 1.11 1.1 3.5.2 20090211 @@ -336,6 +337,11 @@ ${jline.version} + org.fusesource.jansi + jansi + ${jansi.version} + + junit junit ${junit.version} diff --git ql/pom.xml ql/pom.xml index c373431..c3f1721 100644 --- ql/pom.xml +++ ql/pom.xml @@ -274,6 +274,16 @@ test + jline + jline + ${jline.version} + + + org.fusesource.jansi + jansi + ${jansi.version} + + org.apache.tez tez-api ${tez.version} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 9bcdeed..284acbc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -19,29 +19,48 @@ package org.apache.hadoop.hive.ql.exec.tez; import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; +import static org.fusesource.jansi.Ansi.ansi; +import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO; +import static org.fusesource.jansi.internal.CLibrary.isatty; -import java.io.IOException; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Heartbeater; +import org.apache.hadoop.hive.ql.exec.MapOperator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.Progress; import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.api.client.VertexStatus; +import org.fusesource.jansi.Ansi; + +import java.io.IOException; +import java.io.PrintStream; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import jline.Terminal; /** * TezJobMonitor keeps track of a tez job while it's being executed. It will @@ -50,16 +69,47 @@ */ public class TezJobMonitor { - private static final Log LOG = LogFactory.getLog(TezJobMonitor.class.getName()); private static final String CLASS_NAME = TezJobMonitor.class.getName(); + private static final int MIN_TERMINAL_WIDTH = 80; + private static final int COLUMN_1_WIDTH = 16; + private static final int SEPARATOR_WIDTH = 80; + + // keep this within 80 chars width. If more columns needs to be added then update min terminal + // width requirement and separator width accordingly + private static final String HEADER_FORMAT = "%16s%12s %5s %9s %7s %7s %6s %6s"; + private static final String VERTEX_FORMAT = "%-16s%12s %5s %9s %7s %7s %6s %6s"; + private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s"; + private static final String HEADER = String.format(HEADER_FORMAT, + "VERTICES", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED"); + + // method and dag summary format + private static final String SUMMARY_HEADER_FORMAT = "%-16s %-12s %-12s %-15s %-20s %-15s %-15s %-15s %-15s"; + private static final String SUMMARY_VERTEX_FORMAT = "%-12s %11s %13s %12s %19s %19s %13s %15s %16s"; + private static final String SUMMARY_HEADER = String.format(SUMMARY_HEADER_FORMAT, + "VERTICES", "TOTAL_TASKS", "FAILED_ATTEMPTS", "KILLED_TASKS", "DURATION_SECONDS", + "CPU_TIME_MILLIS", "GC_TIME_MILLIS", "INPUT_RECORDS", "OUTPUT_RECORDS"); + + private static final String TOTAL_PREP_TIME = "TotalPrepTime"; + private static final String METHOD = "METHOD"; + private static final String DURATION = "DURATION(ms)"; + + // in-place progress update related variables + private int lines; + private PrintStream out; + private String separator; private transient LogHelper console; private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private final int checkInterval = 200; private final int maxRetryInterval = 2500; private final int printInterval = 3000; + private final int progressBarChars = 30; private long lastPrintTime; private Set completed; + + /* Pretty print the values */ + private final NumberFormat secondsFormat; + private final NumberFormat commaFormat; private static final List shutdownList; static { @@ -83,20 +133,111 @@ public void run() { } public TezJobMonitor() { - console = new LogHelper(LOG); + console = SessionState.getConsole(); + secondsFormat = new DecimalFormat("#0.00"); + commaFormat = NumberFormat.getNumberInstance(Locale.US); + // all progress updates are written to info stream and log file. In-place updates can only be + // done to info stream (console) + out = console.getInfoStream(); + separator = ""; + for (int i = 0; i < SEPARATOR_WIDTH; i++) { + separator += "-"; + } + } + + private static boolean isUnixTerminal() { + + String os = System.getProperty("os.name"); + if (os.startsWith("Windows")) { + // we do not support Windows, we will revisit this if we really need it for windows. + return false; + } + + // We must be on some unix variant.. + // check if standard out is a terminal + try { + // isatty system call will return 1 if the file descriptor is terminal else 0 + if (isatty(STDOUT_FILENO) == 0) { + return false; + } + } catch (NoClassDefFoundError ignore) { + // These errors happen if the JNI lib is not available for your platform. + return false; + } catch (UnsatisfiedLinkError ignore) { + // These errors happen if the JNI lib is not available for your platform. + return false; + } + return true; + } + + /** + * NOTE: Use this method only if isUnixTerminal is true. + * Erases the current line and prints the given line. + * @param line - line to print + */ + public void reprintLine(String line) { + out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); + out.flush(); + lines++; } /** - * monitorExecution handles status printing, failures during execution and final - * status retrieval. + * NOTE: Use this method only if isUnixTerminal is true. + * Erases the current line and prints the given line with the specified color. + * @param line - line to print + * @param color - color for the line + */ + public void reprintLineWithColorAsBold(String line, Ansi.Color color) { + out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset() + .toString()); + out.flush(); + lines++; + } + + /** + * NOTE: Use this method only if isUnixTerminal is true. + * Erases the current line and prints the given multiline. Make sure the specified line is not + * terminated by linebreak. + * @param line - line to print + */ + public void reprintMultiLine(String line) { + int numLines = line.split("\r\n|\r|\n").length; + out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString()); + out.flush(); + lines += numLines; + } + + /** + * NOTE: Use this method only if isUnixTerminal is true. + * Repositions the cursor back to line 0. + */ + public void repositionCursor() { + if (lines > 0) { + out.print(ansi().cursorUp(lines).toString()); + out.flush(); + lines = 0; + } + } + + /** + * NOTE: Use this method only if isUnixTerminal is true. + * Gets the width of the terminal + * @return - width of terminal + */ + public int getTerminalWidth() { + return Terminal.getTerminal().getTerminalWidth(); + } + + /** + * monitorExecution handles status printing, failures during execution and final status retrieval. * * @param dagClient client that was used to kick off the job * @param txnMgr transaction manager for this operation * @param conf configuration file for this operation * @return int 0 - success, 1 - killed, 2 - failed */ - public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, - HiveConf conf) throws InterruptedException { + public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, HiveConf conf, + DAG dag) throws InterruptedException { DAGStatus status = null; completed = new HashSet(); @@ -109,6 +250,22 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, Set opts = new HashSet(); Heartbeater heartbeater = new Heartbeater(txnMgr, conf); long startTime = 0; + boolean isProfileEnabled = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY); + boolean inPlaceUpdates = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS); + boolean wideTerminal = false; + boolean isTerminal = inPlaceUpdates == true ? isUnixTerminal() : false; + + // we need at least 80 chars wide terminal to display in-place updates properly + if (isTerminal) { + if (getTerminalWidth() >= MIN_TERMINAL_WIDTH) { + wideTerminal = true; + } + } + + boolean inPlaceEligible = false; + if (inPlaceUpdates && isTerminal && wideTerminal && !console.getIsSilent()) { + inPlaceEligible = true; + } shutdownList.add(dagClient); @@ -116,7 +273,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); - while(true) { + while (true) { try { status = dagClient.getDAGStatus(opts); @@ -127,7 +284,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, if (state != lastState || state == RUNNING) { lastState = state; - switch(state) { + switch (state) { case SUBMITTED: console.printInfo("Status: Submitted"); break; @@ -138,23 +295,49 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n"); - for (String s: progressMap.keySet()) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); - } startTime = System.currentTimeMillis(); running = true; } - lastReport = printStatus(progressMap, lastReport, console); + if (inPlaceEligible) { + printStatusInPlace(progressMap, startTime, false, dagClient); + // log the progress report to log file as well + console.logInfo(getReport(progressMap)); + } else { + lastReport = printStatus(progressMap, lastReport, console); + } break; case SUCCEEDED: - lastReport = printStatus(progressMap, lastReport, console); - double duration = (System.currentTimeMillis() - startTime)/1000.0; - console.printInfo("Status: Finished successfully in " + String.format("%.2f seconds", duration)); + if (inPlaceEligible) { + printStatusInPlace(progressMap, startTime, false, dagClient); + // log the progress report to log file as well + console.logInfo(getReport(progressMap)); + } else { + lastReport = printStatus(progressMap, lastReport, console); + } + + /* Profile info is collected anyways, isProfileEnabled + * decides if it gets printed or not + */ + if (isProfileEnabled) { + + double duration = (System.currentTimeMillis() - startTime) / 1000.0; + console.printInfo("Status: DAG finished successfully in " + + String.format("%.2f seconds", duration)); + console.printInfo("\n"); + + printMethodsSummary(); + printDagSummary(progressMap, console, dagClient, conf, dag); + } running = false; done = true; break; case KILLED: + if (inPlaceEligible) { + printStatusInPlace(progressMap, startTime, true, dagClient); + // log the progress report to log file as well + console.logInfo(getReport(progressMap)); + } console.printInfo("Status: Killed"); running = false; done = true; @@ -162,6 +345,11 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, break; case FAILED: case ERROR: + if (inPlaceEligible) { + printStatusInPlace(progressMap, startTime, true, dagClient); + // log the progress report to log file as well + console.logInfo(getReport(progressMap)); + } console.printError("Status: Failed"); running = false; done = true; @@ -173,15 +361,15 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, Thread.sleep(checkInterval); } } catch (Exception e) { - console.printInfo("Exception: "+e.getMessage()); - if (++failedCounter % maxRetryInterval/checkInterval == 0 + console.printInfo("Exception: " + e.getMessage()); + if (++failedCounter % maxRetryInterval / checkInterval == 0 || e instanceof InterruptedException) { try { console.printInfo("Killing DAG..."); dagClient.tryKillDAG(); - } catch(IOException io) { + } catch (IOException io) { // best effort - } catch(TezException te) { + } catch (TezException te) { // best effort } e.printStackTrace(); @@ -194,7 +382,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, } finally { if (done) { if (rc != 0 && status != null) { - for (String diag: status.getDiagnostics()) { + for (String diag : status.getDiagnostics()) { console.printError(diag); } } @@ -222,7 +410,323 @@ public static void killRunningJobs() { } } + private static long getCounterValueByGroupName(TezCounters vertexCounters, + String groupNamePattern, + String counterName) { + TezCounter tezCounter = vertexCounters.getGroup(groupNamePattern).findCounter(counterName); + return (tezCounter == null) ? 0 : tezCounter.getValue(); + } + + private void printMethodsSummary() { + long totalInPrepTime = 0; + + String[] perfLoggerReportMethods = { + (PerfLogger.PARSE), + (PerfLogger.ANALYZE), + (PerfLogger.TEZ_BUILD_DAG), + (PerfLogger.TEZ_SUBMIT_TO_RUNNING) + }; + + /* Build the method summary header */ + String methodBreakdownHeader = String.format("%-30s %-13s", METHOD, DURATION); + console.printInfo(methodBreakdownHeader); + + for (String method : perfLoggerReportMethods) { + long duration = perfLogger.getDuration(method); + totalInPrepTime += duration; + console.printInfo(String.format("%-30s %11s", method, commaFormat.format(duration))); + } + + /* + * The counters list above don't capture the total time from TimeToSubmit.startTime till + * TezRunDag.startTime, so calculate the duration and print it. + */ + totalInPrepTime = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) - + perfLogger.getStartTime(PerfLogger.TIME_TO_SUBMIT); + + console.printInfo(String.format("%-30s %11s\n", TOTAL_PREP_TIME, commaFormat.format( + totalInPrepTime))); + } + + private void printDagSummary(Map progressMap, LogHelper console, + DAGClient dagClient, HiveConf conf, DAG dag) { + + /* Strings for headers and counters */ + String hiveCountersGroup = conf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); + Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); + TezCounters hiveCounters = null; + try { + hiveCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters(); + } catch (IOException e) { + // best attempt, shouldn't really kill DAG for this + } catch (TezException e) { + // best attempt, shouldn't really kill DAG for this + } + + /* If the counters are missing there is no point trying to print progress */ + if (hiveCounters == null) { + return; + } + + /* Print the per Vertex summary */ + console.printInfo(SUMMARY_HEADER); + SortedSet keys = new TreeSet(progressMap.keySet()); + Set statusOptions = new HashSet(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + for (String vertexName : keys) { + Progress progress = progressMap.get(vertexName); + if (progress != null) { + final int totalTasks = progress.getTotalTaskCount(); + final int failedTaskAttempts = progress.getFailedTaskAttemptCount(); + final int killedTasks = progress.getKilledTaskCount(); + final double duration = + perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName) / 1000.0; + VertexStatus vertexStatus = null; + try { + vertexStatus = dagClient.getVertexStatus(vertexName, statusOptions); + } catch (IOException e) { + // best attempt, shouldn't really kill DAG for this + } catch (TezException e) { + // best attempt, shouldn't really kill DAG for this + } + + if (vertexStatus == null) { + continue; + } + + Vertex currentVertex = dag.getVertex(vertexName); + List inputVerticesList = currentVertex.getInputVertices(); + long hiveInputRecordsFromOtherVertices = 0; + if (inputVerticesList.size() > 0) { + + for (Vertex inputVertex : inputVerticesList) { + String inputVertexName = inputVertex.getName(); + hiveInputRecordsFromOtherVertices += getCounterValueByGroupName(hiveCounters, + hiveCountersGroup, String.format("%s_", + ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()) + + inputVertexName.replace(" ", "_")); + + hiveInputRecordsFromOtherVertices += getCounterValueByGroupName(hiveCounters, + hiveCountersGroup, String.format("%s_", + FileSinkOperator.Counter.RECORDS_OUT.toString()) + + inputVertexName.replace(" ", "_")); + } + } + + /* + * Get the CPU & GC + * + * counters org.apache.tez.common.counters.TaskCounter + * GC_TIME_MILLIS=37712 + * CPU_MILLISECONDS=2774230 + */ + final TezCounters vertexCounters = vertexStatus.getVertexCounters(); + final double cpuTimeMillis = getCounterValueByGroupName(vertexCounters, + TaskCounter.class.getName(), + TaskCounter.CPU_MILLISECONDS.name()); + + final double gcTimeMillis = getCounterValueByGroupName(vertexCounters, + TaskCounter.class.getName(), + TaskCounter.GC_TIME_MILLIS.name()); + + /* + * Get the HIVE counters + * + * HIVE + * CREATED_FILES=1 + * DESERIALIZE_ERRORS=0 + * RECORDS_IN_Map_1=550076554 + * RECORDS_OUT_INTERMEDIATE_Map_1=854987 + * RECORDS_OUT_Reducer_2=1 + */ + final long hiveInputRecords = getCounterValueByGroupName(hiveCounters, hiveCountersGroup, + MapOperator.Counter.RECORDS_IN.toString()) + hiveInputRecordsFromOtherVertices; + final long hiveOutputIntermediateRecords = getCounterValueByGroupName(hiveCounters, + hiveCountersGroup, ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()); + final long hiveOutputRecords = getCounterValueByGroupName(hiveCounters, hiveCountersGroup, + FileSinkOperator.Counter.RECORDS_OUT.toString()) + hiveOutputIntermediateRecords; + + String vertexExecutionStats = String.format(SUMMARY_VERTEX_FORMAT, + vertexName, + totalTasks, + failedTaskAttempts, + killedTasks, + secondsFormat.format((duration)), + commaFormat.format(cpuTimeMillis), + commaFormat.format(gcTimeMillis), + commaFormat.format(hiveInputRecords), + commaFormat.format(hiveOutputRecords)); + console.printInfo(vertexExecutionStats); + } + } + } + + private void printStatusInPlace(Map progressMap, long startTime, + boolean vextexStatusFromAM, DAGClient dagClient) { + StringBuffer reportBuffer = new StringBuffer(); + int sumComplete = 0; + int sumTotal = 0; + + // position the cursor to line 0 + repositionCursor(); + + // print header + // ------------------------------------------------------------------------------- + // VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED + // ------------------------------------------------------------------------------- + reprintLine(separator); + reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN); + reprintLine(separator); + + SortedSet keys = new TreeSet(progressMap.keySet()); + int idx = 0; + int maxKeys = keys.size(); + for (String s : keys) { + idx++; + Progress progress = progressMap.get(s); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + final int running = progress.getRunningTaskCount(); + final int failed = progress.getFailedTaskAttemptCount(); + final int pending = progress.getTotalTaskCount() - progress.getSucceededTaskCount() - + progress.getRunningTaskCount(); + final int killed = progress.getKilledTaskCount(); + + // To get vertex status we can use DAGClient.getVertexStatus(), but it will be expensive to + // get status from AM for every refresh of the UI. Lets infer the state from task counts. + // Only if DAG is FAILED or KILLED the vertex status is fetched from AM. + VertexStatus.State vertexState = VertexStatus.State.INITIALIZING; + + // INITED state + if (total > 0) { + vertexState = VertexStatus.State.INITED; + sumComplete += complete; + sumTotal += total; + } + + // RUNNING state + if (complete < total && (complete > 0 || running > 0 || failed > 0)) { + vertexState = VertexStatus.State.RUNNING; + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + } + + // SUCCEEDED state + if (complete == total) { + vertexState = VertexStatus.State.SUCCEEDED; + if (!completed.contains(s)) { + completed.add(s); + + /* We may have missed the start of the vertex + * due to the 3 seconds interval + */ + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + } + + // DAG might have been killed, lets try to get vertex state from AM before dying + // KILLED or FAILED state + if (vextexStatusFromAM) { + VertexStatus vertexStatus = null; + try { + vertexStatus = dagClient.getVertexStatus(s, null); + } catch (IOException e) { + // best attempt, shouldn't really kill DAG for this + } catch (TezException e) { + // best attempt, shouldn't really kill DAG for this + } + if (vertexStatus != null) { + vertexState = vertexStatus.getState(); + } + } + + // Map 1 .......... SUCCEEDED 7 7 0 0 0 0 + String nameWithProgress = getNameWithProgress(s, complete, total); + String vertexStr = String.format(VERTEX_FORMAT, + nameWithProgress, + vertexState.toString(), + total, + complete, + running, + pending, + failed, + killed); + reportBuffer.append(vertexStr); + if (idx != maxKeys) { + reportBuffer.append("\n"); + } + } + + reprintMultiLine(reportBuffer.toString()); + + // ------------------------------------------------------------------------------- + // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s + // ------------------------------------------------------------------------------- + reprintLine(separator); + final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal; + String footer = getFooter(keys.size(), completed.size(), progress, startTime); + reprintLineWithColorAsBold(footer, Ansi.Color.RED); + reprintLine(separator); + } + + // Map 1 .......... + private String getNameWithProgress(String s, int complete, int total) { + float percent = total == 0 ? 0.0f : (float) complete / (float) total; + // lets use the remaining space in column 1 as progress bar + int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1; + String result = s + " "; + int toFill = (int) (spaceRemaining * percent); + for (int i = 0; i < toFill; i++) { + result += "."; + } + return result; + } + + // VERTICES: 03/04 [==================>>-----] 86% ELAPSED TIME: 1.71 s + private String getFooter(int keySize, int completedSize, float progress, long startTime) { + String verticesSummary = String.format("VERTICES: %02d/%02d", completedSize, keySize); + String progressBar = getInPlaceProgressBar(progress); + final int progressPercent = (int) (progress * 100); + String progressStr = "" + progressPercent + "%"; + float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000; + String elapsedTime = "ELAPSED TIME: " + secondsFormat.format(et) + " s"; + String footer = String.format(FOOTER_FORMAT, + verticesSummary, progressBar, progressStr, elapsedTime); + return footer; + } + + // [==================>>-----] + private String getInPlaceProgressBar(float percent) { + StringBuilder bar = new StringBuilder("["); + int remainingChars = progressBarChars - 4; + int completed = (int) (remainingChars * percent); + int pending = remainingChars - completed; + for (int i = 0; i < completed; i++) { + bar.append("="); + } + bar.append(">>"); + for (int i = 0; i < pending; i++) { + bar.append("-"); + } + bar.append("]"); + return bar.toString(); + } + private String printStatus(Map progressMap, String lastReport, LogHelper console) { + String report = getReport(progressMap); + if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) { + console.printInfo(report); + lastPrintTime = System.currentTimeMillis(); + } + return report; + } + + private String getReport(Map progressMap) { StringBuffer reportBuffer = new StringBuffer(); SortedSet keys = new TreeSet(progressMap.keySet()); @@ -231,7 +735,7 @@ private String printStatus(Map progressMap, String lastReport, final int complete = progress.getSucceededTaskCount(); final int total = progress.getTotalTaskCount(); final int running = progress.getRunningTaskCount(); - final int failed = progress.getFailedTaskCount(); + final int failed = progress.getFailedTaskAttemptCount(); if (total <= 0) { reportBuffer.append(String.format("%s: -/-\t", s, complete, total)); } else { @@ -258,12 +762,6 @@ private String printStatus(Map progressMap, String lastReport, } } - String report = reportBuffer.toString(); - if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) { - console.printInfo(report); - lastPrintTime = System.currentTimeMillis(); - } - - return report; + return reportBuffer.toString(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index e6bbe9a..97684a8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -163,7 +163,7 @@ public int execute(DriverContext driverContext) { // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(); - rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf); + rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf, dag); // fetch the counters Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); diff --git ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index c4327fc..4e2b130 100644 --- ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -18,14 +18,14 @@ package org.apache.hadoop.hive.ql.log; -import java.util.HashMap; -import java.util.Map; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.session.SessionState; +import java.util.HashMap; +import java.util.Map; + /** * PerfLogger. * @@ -147,10 +147,37 @@ public void close(Log _log, QueryPlan queryPlan) { } public Long getStartTime(String method) { - return startTimes.get(method); + long startTime = 0L; + + if (startTimes.containsKey(method)) { + startTime = startTimes.get(method); + } + return startTime; } public Long getEndTime(String method) { - return endTimes.get(method); + long endTime = 0L; + + if (endTimes.containsKey(method)) { + endTime = endTimes.get(method); + } + return endTime; } + + public boolean startTimeHasMethod(String method) { + return startTimes.containsKey(method); + } + + public boolean endTimeHasMethod(String method) { + return endTimes.containsKey(method); + } + + public Long getDuration(String method) { + long duration = 0; + if (startTimes.containsKey(method) && endTimes.containsKey(method)) { + duration = endTimes.get(method) - startTimes.get(method); + } + return duration; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index af633cb..2806bd1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -805,6 +805,14 @@ public boolean getIsSilent() { return (ss != null) ? ss.getIsSilent() : isSilent; } + public void logInfo(String info) { + logInfo(info, null); + } + + public void logInfo(String info, String detail) { + LOG.info(info + StringUtils.defaultString(detail)); + } + public void printInfo(String info) { printInfo(info, null); }