diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 96e6e61..c74df78 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1912,7 +1912,15 @@
TEZ_SMB_NUMBER_WAVES(
"hive.tez.smb.number.waves",
(float) 0.5,
- "The number of waves in which to run the SMB join. Account for cluster being occupied. Ideally should be 1 wave.")
+ "The number of waves in which to run the SMB join. Account for cluster being occupied. Ideally should be 1 wave."),
+ TEZ_EXEC_SUMMARY(
+ "hive.tez.exec.print.summary",
+ false,
+ "Display breakdown of execution steps, for every query executed by the shell."),
+ TEZ_PROGRESS_IN_PLACE_UPDATES(
+ "hive.tez.progress.update.inplace",
+ true,
+ "Updates tez job execution progress in-place in the terminal.")
;
public final String varname;
diff --git pom.xml pom.xml
index bd74830..a5f851f 100644
--- pom.xml
+++ pom.xml
@@ -129,6 +129,7 @@
7.6.0.v20120127
1.14
0.9.94
+ 1.11
1.1
3.5.2
20090211
@@ -336,6 +337,11 @@
${jline.version}
+ org.fusesource.jansi
+ jansi
+ ${jansi.version}
+
+
junit
junit
${junit.version}
diff --git ql/pom.xml ql/pom.xml
index c373431..c3f1721 100644
--- ql/pom.xml
+++ ql/pom.xml
@@ -274,6 +274,16 @@
test
+ jline
+ jline
+ ${jline.version}
+
+
+ org.fusesource.jansi
+ jansi
+ ${jansi.version}
+
+
org.apache.tez
tez-api
${tez.version}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index 9bcdeed..3ba8f39 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -19,9 +19,15 @@
package org.apache.hadoop.hive.ql.exec.tez;
import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
+import static org.fusesource.jansi.Ansi.ansi;
+import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO;
+import static org.fusesource.jansi.internal.CLibrary.isatty;
import java.io.IOException;
+import java.io.PrintStream;
+import java.text.DecimalFormat;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -33,15 +39,29 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Heartbeater;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.Progress;
import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.fusesource.jansi.Ansi;
+
+import jline.Terminal;
/**
* TezJobMonitor keeps track of a tez job while it's being executed. It will
@@ -50,16 +70,54 @@
*/
public class TezJobMonitor {
- private static final Log LOG = LogFactory.getLog(TezJobMonitor.class.getName());
private static final String CLASS_NAME = TezJobMonitor.class.getName();
+ public static final boolean UNIX_TERMINAL = detectUnixTerminal();
+ private static final int MIN_TERMINAL_WIDTH = 80;
+ private static final int COLUMN_1_WIDTH = 16;
+ private static final String HEADER = String.format("%16s %10s %5s %9s %7s %7s %6s %6s",
+ "VERTICES",
+ "STATUS",
+ "TOTAL",
+ "COMPLETED",
+ "RUNNING",
+ "PENDING",
+ "FAILED",
+ "KILLED");
+ private static final String SEPARATOR = "-------------------------------------------------------------------------------";
+ private static final String TOTAL_PREP_TIME = "TotalPrepTime";
+ private static final String METHOD = "METHOD";
+ private static final String DURATION = "DURATION_MS";
+ private static final String VERTICES = "VERTICES";
+ private static final String VERTEX_NAME = "VERTEX";
+ private static final String TOTAL_TASKS = "TOTAL_TASKS";
+ private static final String FAILED_TASKS = "FAILED_TASKS";
+ private static final String KILLED_TASKS = "KILLED_TASKS";
+ private static final String ELAPSED_TIME = "DURATION_SECONDS";
+ private static final String CPU_TIME = "CPU_TIME_MILLIS";
+ private static final String GC_TIME = "GC_TIME_MILLIS";
+ private static final String INPUT_RECORDS = "INPUT_RECORDS";
+ private static final String OUTPUT_RECORDS = "OUTPUT_RECORDS";
+
+ // in-place progress update related variables
+ private int lines;
+ private PrintStream out;
private transient LogHelper console;
private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
private final int checkInterval = 200;
private final int maxRetryInterval = 2500;
private final int printInterval = 3000;
+ private final int progressBarChars = 25;
+ private final int progressPercentChars = 3;
+ private final int progressDevisor = 100 / Math.min(100, progressBarChars);
private long lastPrintTime;
private Set completed;
+
+ /* Pretty print the values */
+ private final DecimalFormat msDurationFormatter;
+ private final DecimalFormat secondDurationFormatter;
+ private final DecimalFormat decimalFormatter;
+
private static final List shutdownList;
static {
@@ -83,20 +141,112 @@ public void run() {
}
public TezJobMonitor() {
- console = new LogHelper(LOG);
+ console = SessionState.getConsole();
+ msDurationFormatter = new DecimalFormat("###,###,###,###,###,###");
+ secondDurationFormatter = new DecimalFormat("###,###,###,###,###,###0.00");
+ decimalFormatter = new DecimalFormat("###,###,###,###,###,###,###");
+ // all progress updates are written to info stream and log file. In-place updates can only be
+ // done to info stream (console)
+ out = console.getInfoStream();
+ }
+
+ private static boolean detectUnixTerminal() {
+
+ String os = System.getProperty("os.name");
+ if (os.startsWith("Windows")) {
+ // we do not support Windows, we will revisit this if we really need it for windows.
+ return false;
+ }
+
+ // We must be on some unix variant..
+ // check if standard out is a terminal
+ try {
+ // isatty system call will return 1 if the file descriptor is terminal else 0
+ if (isatty(STDOUT_FILENO) == 0) {
+ return false;
+ }
+ } catch (NoClassDefFoundError ignore) {
+ // These errors happen if the JNI lib is not available for your platform.
+ return false;
+ } catch (UnsatisfiedLinkError ignore) {
+ // These errors happen if the JNI lib is not available for your platform.
+ return false;
+ }
+ return true;
+ }
+
+ public boolean isUnixTerminal() {
+ return UNIX_TERMINAL;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Erases the current line and prints the given line.
+ * @param line - line to print
+ */
+ public void reprintLine(String line) {
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+ out.flush();
+ lines++;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Erases the current line and prints the given line with the specified color.
+ * @param line - line to print
+ * @param color - color for the line
+ */
+ public void reprintLineWithColorAsBold(String line, Ansi.Color color) {
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset()
+ .toString());
+ out.flush();
+ lines++;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Erases the current line and prints the given multiline. Make sure the specified line is not
+ * terminated by linebreak.
+ * @param line - line to print
+ */
+ public void reprintMultiLine(String line) {
+ int numLines = line.split("\r\n|\r|\n").length;
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+ out.flush();
+ lines += numLines;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Repositions the cursor back to line 0.
+ */
+ public void repositionCursor() {
+ if (lines > 0) {
+ out.print(ansi().cursorUp(lines).toString());
+ out.flush();
+ lines = 0;
+ }
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Gets the width of the terminal
+ * @return - width of terminal
+ */
+ public int getTerminalWidth() {
+ return Terminal.getTerminal().getTerminalWidth();
}
/**
- * monitorExecution handles status printing, failures during execution and final
- * status retrieval.
+ * monitorExecution handles status printing, failures during execution and final status retrieval.
*
* @param dagClient client that was used to kick off the job
* @param txnMgr transaction manager for this operation
* @param conf configuration file for this operation
* @return int 0 - success, 1 - killed, 2 - failed
*/
- public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr,
- HiveConf conf) throws InterruptedException {
+ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, HiveConf conf,
+ JobConf jobConf, DAG dag) throws InterruptedException {
DAGStatus status = null;
completed = new HashSet();
@@ -109,6 +259,14 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr,
Set opts = new HashSet();
Heartbeater heartbeater = new Heartbeater(txnMgr, conf);
long startTime = 0;
+ boolean isProfileEnabled = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY);
+ boolean inPlaceUpdates = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_PROGRESS_IN_PLACE_UPDATES);
+ boolean wideTerminal = false;
+ if (isUnixTerminal()) {
+ if (getTerminalWidth() >= MIN_TERMINAL_WIDTH) {
+ wideTerminal = true;
+ }
+ }
shutdownList.add(dagClient);
@@ -116,7 +274,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr,
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
- while(true) {
+ while (true) {
try {
status = dagClient.getDAGStatus(opts);
@@ -127,7 +285,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr,
if (state != lastState || state == RUNNING) {
lastState = state;
- switch(state) {
+ switch (state) {
case SUBMITTED:
console.printInfo("Status: Submitted");
break;
@@ -138,19 +296,40 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr,
if (!running) {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n");
- for (String s: progressMap.keySet()) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
startTime = System.currentTimeMillis();
running = true;
}
- lastReport = printStatus(progressMap, lastReport, console);
+ if (inPlaceUpdates && isUnixTerminal() && wideTerminal && !console.getIsSilent()) {
+ printStatusInPlace(progressMap, console, startTime);
+ // log the progress report to log file as well
+ console.logInfo(getReport(progressMap));
+ } else {
+ lastReport = printStatus(progressMap, lastReport, console);
+ }
break;
case SUCCEEDED:
- lastReport = printStatus(progressMap, lastReport, console);
- double duration = (System.currentTimeMillis() - startTime)/1000.0;
- console.printInfo("Status: Finished successfully in " + String.format("%.2f seconds", duration));
+ if (inPlaceUpdates && isUnixTerminal() && wideTerminal && !console.getIsSilent()) {
+ printStatusInPlace(progressMap, console, startTime);
+ // log the progress report to log file as well
+ console.logInfo(getReport(progressMap));
+ } else {
+ lastReport = printStatus(progressMap, lastReport, console);
+ }
+
+ /* Profile info is collected anyways, isProfileEnabled
+ * decides if it gets printed or not
+ */
+ if (isProfileEnabled) {
+
+ double duration = (System.currentTimeMillis() - startTime) / 1000.0;
+ console.printInfo("Status: DAG finished successfully in "
+ + String.format("%.2f seconds", duration));
+ console.printInfo("\n");
+
+ printMethodsSummary();
+ printDagSummary(progressMap, console, dagClient, conf, jobConf, dag);
+ }
running = false;
done = true;
break;
@@ -173,15 +352,15 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr,
Thread.sleep(checkInterval);
}
} catch (Exception e) {
- console.printInfo("Exception: "+e.getMessage());
- if (++failedCounter % maxRetryInterval/checkInterval == 0
+ console.printInfo("Exception: " + e.getMessage());
+ if (++failedCounter % maxRetryInterval / checkInterval == 0
|| e instanceof InterruptedException) {
try {
console.printInfo("Killing DAG...");
dagClient.tryKillDAG();
- } catch(IOException io) {
+ } catch (IOException io) {
// best effort
- } catch(TezException te) {
+ } catch (TezException te) {
// best effort
}
e.printStackTrace();
@@ -194,7 +373,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr,
} finally {
if (done) {
if (rc != 0 && status != null) {
- for (String diag: status.getDiagnostics()) {
+ for (String diag : status.getDiagnostics()) {
console.printError(diag);
}
}
@@ -222,48 +401,464 @@ public static void killRunningJobs() {
}
}
+ private String getProgressBar(int percent) {
+ StringBuilder bar = new StringBuilder("[");
+ String percentageString = String.format("%1$" + progressPercentChars + "s", percent) + "%";
+ int reservedCenterChars = percentageString.length() + 2;
+
+ int startPrintRange = (progressBarChars - reservedCenterChars) / 2;
+ int endPrintRange = (progressBarChars + reservedCenterChars) / 2 - 1;
+
+ for (int i = 0; i < progressBarChars; i++) {
+
+ if ((i < (percent / progressDevisor) && ((i < startPrintRange || i > endPrintRange)))) {
+ bar.append("=");
+ } else if (i == (progressBarChars - reservedCenterChars) / 2) {
+ bar.append(percentageString);
+ i += percentageString.length();
+ } else if (i == (percent / progressDevisor) && ((i < startPrintRange || i > endPrintRange))) {
+ bar.append(">");
+ } else {
+ bar.append(" ");
+ }
+ }
+
+ bar.append("] ");
+ return bar.toString();
+ }
+
+ private String getAggregatedVerticesStatus(int verticesCount, int completedVertices, int runningVertices) {
+
+ StringBuilder reportBuffer = new StringBuilder();
+
+ if (runningVertices > 0) {
+ reportBuffer.append(String.format("%s: %d(+%d)/%d\t", VERTICES, completedVertices,
+ runningVertices, verticesCount));
+ } else {
+ reportBuffer.append(String
+ .format("%s: %d/%d\t", VERTICES, completedVertices, verticesCount));
+ }
+
+ return reportBuffer.toString();
+ }
+
+ private static long getCounterValueByGroupName(TezCounters vertexCounters, String groupNamePattern,
+ String counterName) {
+
+ TezCounter tezCounter;
+ long counterValue = 0;
+
+ for (String groupName : vertexCounters.getGroupNames()) {
+ if (groupName.equals(groupNamePattern)) {
+ tezCounter = vertexCounters.getGroup(groupName).findCounter(counterName);
+ counterValue += (tezCounter != null) ? tezCounter.getValue() : 0;
+ }
+ }
+
+ return counterValue;
+ }
+
+ private void printMethodsSummary() {
+ double totalInPrepTime = 0;
+
+ String[] perfLoggerReportMethods = {
+ (PerfLogger.PARSE),
+ (PerfLogger.ANALYZE),
+ (PerfLogger.TEZ_BUILD_DAG),
+ (PerfLogger.TEZ_SUBMIT_TO_RUNNING)
+ };
+
+ /* Build the method summary header */
+ String methodBreakdownHeader = String.format("%-30s %-13s", METHOD, DURATION);
+ console.printInfo(methodBreakdownHeader);
+
+ for (String method : perfLoggerReportMethods) {
+ long duration = perfLogger.getDuration(method);
+ totalInPrepTime += duration;
+ console.printInfo(String.format("%-30s %11s", method, msDurationFormatter.format(duration)));
+ }
+
+ /*
+ * The counters list above don't capture the total time from TimeToSubmit.startTime till
+ * TezRunDag.startTime, so calculate the duration and print it.
+ */
+ totalInPrepTime =
+ perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG)
+ - perfLogger.getStartTime(PerfLogger.TIME_TO_SUBMIT);
+
+ console.printInfo(String.format("%-30s %11s", TOTAL_PREP_TIME,
+ msDurationFormatter.format(totalInPrepTime)));
+
+ console.printInfo("\n");
+ }
+
+ private void printDagSummary(Map progressMap, LogHelper console,
+ DAGClient dagClient, HiveConf conf, JobConf jobConf, DAG dag) throws IOException,
+ TezException {
+
+ VertexStatus vertexStatus;
+ TezCounters vertexCounters;
+ TezCounter tezCounter;
+ double cpuTimeMillis = 0;
+ double gcTimeMillis = 0;
+ double hiveInputRecords = 0;
+ double hiveOutputRecords = 0;
+ double hiveOutputIntermediateRecords = 0;
+ double hiveInputRecordsFromOtherVertices = 0;
+ double duration = 0;
+
+ int totalTasks = 0;
+ int failedTasks = 0;
+ int killedTasks = 0;
+
+ /* Strings for headers and counters */
+ String hiveCountersGroup = conf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+ String vertexExecutionStats;
+
+ /* Build the header for the per vertex summary */
+ String vertexSummaryHeader =
+ String.format("%-12s %-12s %-12s %-15s %-20s %-15s %-15s %-15s %-15s", VERTEX_NAME,
+ TOTAL_TASKS, FAILED_TASKS, KILLED_TASKS, ELAPSED_TIME, CPU_TIME, GC_TIME,
+ INPUT_RECORDS, OUTPUT_RECORDS);
+
+ /* Print the per Vertex summary */
+ console.printInfo(vertexSummaryHeader);
+
+ Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+ TezCounters hiveCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
+
+ /* If the counters are missing there is no point trying to print progress */
+ if (hiveCounters == null) {
+ return;
+ }
+
+ SortedSet keys = new TreeSet(progressMap.keySet());
+
+ Set statusOptions = new HashSet(1);
+ statusOptions.add(StatusGetOpts.GET_COUNTERS);
+
+ for (String vertexName : keys) {
+ Progress progress = progressMap.get(vertexName);
+
+ /* Be extra careful not to hit NPE */
+ if (progress == null) {
+ continue;
+ }
+
+ totalTasks = progress.getTotalTaskCount();
+ failedTasks = progress.getFailedTaskCount();
+ killedTasks = progress.getKilledTaskCount();
+ hiveInputRecordsFromOtherVertices = 0;
+
+ duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName) / 1000.0;
+
+ vertexStatus = dagClient.getVertexStatus(vertexName, statusOptions);
+
+ if (vertexStatus == null) {
+ continue;
+ }
+
+ Vertex currentVertex = dag.getVertex(vertexName);
+ List inputVerticesList = currentVertex.getInputVertices();
+ if (inputVerticesList.size() > 0) {
+
+ for (Vertex inputVertex : inputVerticesList) {
+
+ String inputVertexName = inputVertex.getName();
+
+ hiveInputRecordsFromOtherVertices +=
+ getCounterValueByGroupName(
+ hiveCounters,
+ hiveCountersGroup,
+ String.format("%s_",
+ ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString())
+ + inputVertexName.replace(" ", "_"));
+
+ hiveInputRecordsFromOtherVertices +=
+ getCounterValueByGroupName(hiveCounters, hiveCountersGroup,
+ String.format("%s_", FileSinkOperator.Counter.RECORDS_OUT.toString())
+ + inputVertexName.replace(" ", "_"));
+ }
+ }
+
+ /*
+ * Get the CPU & GC
+ *
+ * counters org.apache.tez.common.counters.TaskCounter
+ * GC_TIME_MILLIS=37712
+ * CPU_MILLISECONDS=2774230
+ */
+ vertexCounters = vertexStatus.getVertexCounters();
+
+ cpuTimeMillis =
+ getCounterValueByGroupName(vertexCounters, TaskCounter.class.getName(),
+ TaskCounter.CPU_MILLISECONDS.name());
+
+ gcTimeMillis =
+ getCounterValueByGroupName(vertexCounters, TaskCounter.class.getName(),
+ TaskCounter.GC_TIME_MILLIS.name());
+
+ /*
+ * Get the HIVE counters
+ *
+ * HIVE
+ * CREATED_FILES=1
+ * DESERIALIZE_ERRORS=0
+ * RECORDS_IN_Map_1=550076554
+ * RECORDS_OUT_INTERMEDIATE_Map_1=854987
+ * RECORDS_OUT_Reducer_2=1
+ */
+ hiveInputRecords =
+ getCounterValueByGroupName(
+ hiveCounters,
+ hiveCountersGroup,
+ String.format("%s_", MapOperator.Counter.RECORDS_IN.toString())
+ + vertexName.replace(" ", "_"));
+
+ hiveInputRecords += hiveInputRecordsFromOtherVertices;
+
+ hiveOutputRecords =
+ getCounterValueByGroupName(
+ hiveCounters,
+ hiveCountersGroup,
+ String.format("%s_", FileSinkOperator.Counter.RECORDS_OUT.toString())
+ + vertexName.replace(" ", "_"));
+
+ hiveOutputIntermediateRecords =
+ getCounterValueByGroupName(hiveCounters, hiveCountersGroup,
+ String.format("%s_", ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString())
+ + vertexName.replace(" ", "_"));
+
+ hiveOutputRecords += hiveOutputIntermediateRecords;
+
+ vertexExecutionStats =
+ String.format("%-12s %11s %13s %12s %19s %19s %13s %15s %16s", vertexName,
+ Long.toString(totalTasks), Long.toString(failedTasks), Long.toString(killedTasks),
+ secondDurationFormatter.format((duration)), decimalFormatter.format(cpuTimeMillis),
+ decimalFormatter.format(gcTimeMillis), decimalFormatter.format(hiveInputRecords),
+ decimalFormatter.format(hiveOutputRecords));
+
+ console.printInfo(vertexExecutionStats);
+ }
+ }
+
+ private void printStatusInPlace(Map progressMap, LogHelper console,
+ long startTime) {
+ StringBuffer reportBuffer = new StringBuffer();
+ int sumComplete = 0;
+ int sumTotal = 0;
+
+ // position the cursor to line 0
+ repositionCursor();
+
+ // print header
+ // -------------------------------------------------------------------------------
+ // VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
+ // -------------------------------------------------------------------------------
+ reprintLine(SEPARATOR);
+ reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN);
+ reprintLine(SEPARATOR);
+
+ SortedSet keys = new TreeSet(progressMap.keySet());
+ int idx = 0;
+ int maxKeys = keys.size();
+ for (String s : keys) {
+ idx++;
+ Progress progress = progressMap.get(s);
+ final int complete = progress.getSucceededTaskCount();
+ final int total = progress.getTotalTaskCount();
+ final int running = progress.getRunningTaskCount();
+ final int failed = progress.getFailedTaskCount();
+ final int pending = progress.getTotalTaskCount() - progress.getSucceededTaskCount() -
+ progress.getRunningTaskCount();
+ final int killed = progress.getKilledTaskCount();
+
+ // we don't really have state information for vertex. Let's mimic the vertex state using the
+ // available task counts
+ DAGStatus.State vertexState = DAGStatus.State.SUBMITTED;
+ sumComplete += complete;
+ sumTotal += total;
+
+ if (total > 0) {
+ if (complete == total) {
+ vertexState = DAGStatus.State.SUCCEEDED;
+ if (!completed.contains(s)) {
+ completed.add(s);
+
+ /* We may have missed the start of the vertex
+ * due to the 3 seconds interval
+ */
+ if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+ }
+
+ if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
+ vertexState = DAGStatus.State.RUNNING;
+ if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+ }
+ // Map 1 .......... SUCCEEDED 7 7 0 0 0 0
+ String nameWithProgress = getNameWithProgress(s, complete, total);
+ String vertexStr = String.format("%-16s %10s %5s %9s %7s %7s %6s %6s",
+ nameWithProgress,
+ vertexState.toString(),
+ total,
+ complete,
+ running,
+ pending,
+ failed,
+ killed);
+ reportBuffer.append(vertexStr);
+ if (idx != maxKeys) {
+ reportBuffer.append("\n");
+ }
+ }
+ }
+
+ reprintMultiLine(reportBuffer.toString());
+
+ // -------------------------------------------------------------------------------
+ // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s
+ // -------------------------------------------------------------------------------
+ reprintLine(SEPARATOR);
+ final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal;
+ String footer = getFooter(keys.size(), completed.size(), progress, startTime);
+ reprintLineWithColorAsBold(footer, Ansi.Color.RED);
+ reprintLine(SEPARATOR);
+ }
+
+ // Map 1 ..........
+ private String getNameWithProgress(String s, int complete, int total) {
+ float percent = total == 0 ? 0.0f : (float) complete / (float) total;
+ // lets use the remaining space in column 1 as progress bar
+ int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
+ String result = s + " ";
+ int toFill = (int) (spaceRemaining * percent);
+ for (int i = 0; i < toFill; i++) {
+ result += ".";
+ }
+ return result;
+ }
+
+ // VERTICES: 03/04 [==================>>-----] 86% ELAPSED TIME: 1.71 s
+ private String getFooter(int keySize, int completedSize, float progress, long startTime) {
+ String verticesSummary = String.format("VERTICES: %02d/%02d", completedSize, keySize);
+ String progressBar = getInPlaceProgressBar(progress);
+ final int progressPercent = (int) (progress * 100);
+ String progressStr = "" + progressPercent + "%";
+ float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000;
+ String elapsedTime = "ELAPSED TIME: " + secondDurationFormatter.format(et) + " s";
+ String footer = String.format("%-15s %-25s %-4s %-25s",
+ verticesSummary, progressBar, progressStr, elapsedTime);
+ return footer;
+ }
+
+ // [==================>>-----]
+ private String getInPlaceProgressBar(float percent) {
+ StringBuilder bar = new StringBuilder("[");
+ int remainingChars = progressBarChars - 4;
+ int completed = (int) (remainingChars * percent);
+ int pending = remainingChars - completed;
+ for (int i = 0; i < completed; i++) {
+ bar.append("=");
+ }
+ bar.append(">>");
+ for (int i = 0; i < pending; i++) {
+ bar.append("-");
+ }
+ bar.append("]");
+ return bar.toString();
+ }
+
private String printStatus(Map progressMap, String lastReport, LogHelper console) {
+
+ String report = getReport(progressMap);
+ if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) {
+ console.printInfo(report);
+ lastPrintTime = System.currentTimeMillis();
+ }
+
+ return report;
+ }
+
+ private String getReport(Map progressMap) {
StringBuffer reportBuffer = new StringBuffer();
+ StringBuffer runningVertices = new StringBuffer();
+ int progressPercentage = 0;
+ int sumComplete = 0;
+ int sumTotal = 0;
+
+ int runningVerticesCount = 0;
SortedSet keys = new TreeSet(progressMap.keySet());
- for (String s: keys) {
+ for (String s : keys) {
Progress progress = progressMap.get(s);
final int complete = progress.getSucceededTaskCount();
final int total = progress.getTotalTaskCount();
final int running = progress.getRunningTaskCount();
final int failed = progress.getFailedTaskCount();
- if (total <= 0) {
- reportBuffer.append(String.format("%s: -/-\t", s, complete, total));
- } else {
+ sumComplete += complete;
+ sumTotal += total;
+
+ if (total > 0) {
if (complete == total && !completed.contains(s)) {
+
completed.add(s);
+
+ /* We may have missed the start of the vertex
+ * due to the 3 seconds interval
+ */
+ if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
}
- if(complete < total && (complete > 0 || running > 0 || failed > 0)) {
+
+ if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
+
+ if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+
+ runningVerticesCount++;
+
/* vertex is started, but not complete */
if (failed > 0) {
- reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
+ runningVertices.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running,
+ failed, total));
} else {
- reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
+ runningVertices.append(String.format(String.format("%s: %d(+%d)/%d\t", s, complete,
+ running, total)));
}
+
} else {
- /* vertex is waiting for input/slots or complete */
+
if (failed > 0) {
/* tasks finished but some failed */
- reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
- } else {
- reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
+ runningVertices.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
}
}
}
}
- String report = reportBuffer.toString();
- if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) {
- console.printInfo(report);
- lastPrintTime = System.currentTimeMillis();
- }
+ progressPercentage = (sumTotal == 0) ? 0 : 100 * sumComplete / sumTotal;
- return report;
+ String verticesSummary =
+ getAggregatedVerticesStatus(keys.size(), completed.size(), runningVerticesCount);
+
+ String progressBar = getProgressBar(progressPercentage);
+
+ reportBuffer.append(String.format("%12s %12s ", progressBar, verticesSummary));
+
+ if (runningVertices.length() > 0) {
+ reportBuffer.append(String.format("%s: ", "RUNNING")).append(runningVertices);
+ }
+ return reportBuffer.toString();
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 337f2f4..ac4903a 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -162,7 +162,7 @@ public int execute(DriverContext driverContext) {
// finally monitor will print progress until the job is done
TezJobMonitor monitor = new TezJobMonitor();
- rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf);
+ rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf, jobConf, dag);
// fetch the counters
Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
diff --git ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
index c4327fc..0325da6 100644
--- ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
+++ ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
@@ -147,10 +147,37 @@ public void close(Log _log, QueryPlan queryPlan) {
}
public Long getStartTime(String method) {
- return startTimes.get(method);
+ Long startTime = 0L;
+
+ if (startTimes.containsKey(method)) {
+ startTime = startTimes.get(method);
+ }
+ return startTime;
}
public Long getEndTime(String method) {
- return endTimes.get(method);
+ Long endTime = 0L;
+
+ if (endTimes.containsKey(method)) {
+ endTime = endTimes.get(method);
+ }
+ return endTime;
}
+
+ public boolean startTimeHasMethod(String method) {
+ return startTimes.containsKey(method);
+ }
+
+ public boolean endTimeHasMethod(String method) {
+ return endTimes.containsKey(method);
+ }
+
+ public Long getDuration(String method) {
+ long duration = 0;
+ if (startTimes.containsKey(method) && endTimes.containsKey(method)) {
+ duration = endTimes.get(method) - startTimes.get(method);
+ }
+ return duration;
+ }
+
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index af633cb..2806bd1 100644
--- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -805,6 +805,14 @@ public boolean getIsSilent() {
return (ss != null) ? ss.getIsSilent() : isSilent;
}
+ public void logInfo(String info) {
+ logInfo(info, null);
+ }
+
+ public void logInfo(String info, String detail) {
+ LOG.info(info + StringUtils.defaultString(detail));
+ }
+
public void printInfo(String info) {
printInfo(info, null);
}