diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7d8e5bc..523638b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1827,7 +1827,11 @@ TEZ_SMB_NUMBER_WAVES( "hive.tez.smb.number.waves", (float) 0.5, - "The number of waves in which to run the SMB join. Account for cluster being occupied. Ideally should be 1 wave.") + "The number of waves in which to run the SMB join. Account for cluster being occupied. Ideally should be 1 wave."), + TEZ_EXEC_SUMMARY( + "hive.tez.exec.print.summary", + false, + "Display breakdown of execution steps, for every query executed by the shell.") ; public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 9bcdeed..dd289f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -21,7 +21,11 @@ import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; import java.io.IOException; +import java.text.DecimalFormat; +import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -33,15 +37,28 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Heartbeater; +import org.apache.hadoop.hive.ql.exec.MapOperator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.Progress; import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.api.client.VertexStatus; +import org.codehaus.groovy.util.StringUtil; +import org.datanucleus.enhancer.spi.StringFieldPK; /** * TezJobMonitor keeps track of a tez job while it's being executed. It will @@ -58,8 +75,32 @@ private final int checkInterval = 200; private final int maxRetryInterval = 2500; private final int printInterval = 3000; + private final int progressBarChars = 30; + private final int progressPercentChars = 3; + private final int progressDevisor = 100 / Math.min(100, progressBarChars); private long lastPrintTime; private Set completed; + + /* Pretty print the values */ + private final DecimalFormat msDurationFormatter; + private final DecimalFormat secondDurationFormatter; + private final DecimalFormat decimalFormatter; + + private final String TOTAL_PREP_TIME = "TotalPrepTime"; + private final String METHOD = "METHOD"; + private final String DURATION = "DURATION_MS"; + private final String VERTICES = "VERTICES"; + private final String VERTEX_NAME = "VERTEX"; + private final String TOTAL_TASKS = "TOTAL_TASKS"; + private final String FAILED_TASKS = "FAILED_TASKS"; + private final String KILLED_TASKS = "KILLED_TASKS"; + private final String ELAPSED_TIME = "DURATION_SECONDS"; + private final String CPU_TIME = "CPU_TIME_MILLIS"; + private final String GC_TIME = "GC_TIME_MILLIS"; + private final String INPUT_RECORDS = "INPUT_RECORDS"; + private final String OUTPUT_RECORDS = "OUTPUT_RECORDS"; + private final String OUTPUT_INTERMEDIATE_RECORDS = "OUTPUT_INTERMEDIATE_RECORDS"; + private static final List shutdownList; static { @@ -84,19 +125,21 @@ public void run() { public TezJobMonitor() { console = new LogHelper(LOG); + msDurationFormatter = new DecimalFormat("###,###,###,###,###,###"); + secondDurationFormatter = new DecimalFormat("###,###,###,###,###,###.00"); + decimalFormatter = new DecimalFormat("###,###,###,###,###,###,###"); } /** - * monitorExecution handles status printing, failures during execution and final - * status retrieval. - * + * monitorExecution handles status printing, failures during execution and final status retrieval. + * * @param dagClient client that was used to kick off the job * @param txnMgr transaction manager for this operation * @param conf configuration file for this operation * @return int 0 - success, 1 - killed, 2 - failed */ - public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, - HiveConf conf) throws InterruptedException { + public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, HiveConf conf, + JobConf jobConf, DAG dag) throws InterruptedException { DAGStatus status = null; completed = new HashSet(); @@ -108,6 +151,8 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, String lastReport = null; Set opts = new HashSet(); Heartbeater heartbeater = new Heartbeater(txnMgr, conf); + boolean isProfileEnabled = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY); + long startTime = 0; shutdownList.add(dagClient); @@ -116,7 +161,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); - while(true) { + while (true) { try { status = dagClient.getDAGStatus(opts); @@ -127,7 +172,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, if (state != lastState || state == RUNNING) { lastState = state; - switch(state) { + switch (state) { case SUBMITTED: console.printInfo("Status: Submitted"); break; @@ -138,9 +183,6 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n"); - for (String s: progressMap.keySet()) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); - } startTime = System.currentTimeMillis(); running = true; } @@ -149,8 +191,20 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, break; case SUCCEEDED: lastReport = printStatus(progressMap, lastReport, console); - double duration = (System.currentTimeMillis() - startTime)/1000.0; - console.printInfo("Status: Finished successfully in " + String.format("%.2f seconds", duration)); + + /* Profile info is collected anyways, isProfileEnabled + * decides if it gets printed or not + */ + if (isProfileEnabled) { + + double duration = (System.currentTimeMillis() - startTime) / 1000.0; + console.printInfo("Status: DAG finished successfully in " + + String.format("%.2f seconds", duration)); + console.printInfo("\n"); + + printMethodsSummary(); + printDagSummary(progressMap, console, dagClient, conf, jobConf, dag); + } running = false; done = true; break; @@ -173,15 +227,15 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, Thread.sleep(checkInterval); } } catch (Exception e) { - console.printInfo("Exception: "+e.getMessage()); - if (++failedCounter % maxRetryInterval/checkInterval == 0 + console.printInfo("Exception: " + e.getMessage()); + if (++failedCounter % maxRetryInterval / checkInterval == 0 || e instanceof InterruptedException) { try { console.printInfo("Killing DAG..."); dagClient.tryKillDAG(); - } catch(IOException io) { + } catch (IOException io) { // best effort - } catch(TezException te) { + } catch (TezException te) { // best effort } e.printStackTrace(); @@ -194,7 +248,7 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, } finally { if (done) { if (rc != 0 && status != null) { - for (String diag: status.getDiagnostics()) { + for (String diag : status.getDiagnostics()) { console.printError(diag); } } @@ -222,42 +276,325 @@ public static void killRunningJobs() { } } + private String getProgressBar(int percent) { + StringBuilder bar = new StringBuilder("["); + String percentageString = String.format("%1$" + progressPercentChars + "s", percent) + "%"; + int reservedCenterChars = percentageString.length() + 2; + + int startPrintRange = (progressBarChars - reservedCenterChars) / 2; + int endPrintRange = (progressBarChars + reservedCenterChars) / 2 - 1; + + for (int i = 0; i < progressBarChars; i++) { + + if ((i < (percent / progressDevisor) && ((i < startPrintRange || i > endPrintRange)))) { + bar.append("="); + } else if (i == (progressBarChars - reservedCenterChars) / 2) { + bar.append(percentageString); + i += percentageString.length(); + } else if (i == (percent / progressDevisor) && ((i < startPrintRange || i > endPrintRange))) { + bar.append(">"); + } else { + bar.append(" "); + } + } + + bar.append("] "); + return bar.toString(); + } + + private String getAggregatedVerticesStatus(int verticesCount, int completedVertices, int runningVertices) { + + StringBuilder reportBuffer = new StringBuilder(); + + if (runningVertices > 0) { + reportBuffer.append(String.format("%s: %d(+%d)/%d\t", VERTICES, completedVertices, + runningVertices, verticesCount)); + } else { + reportBuffer.append(String + .format("%s: %d/%d\t", VERTICES, completedVertices, verticesCount)); + } + + return reportBuffer.toString(); + } + + private static long getCounterValueByGroupName(TezCounters vertexCounters, String groupNamePattern, + String counterName) { + + TezCounter tezCounter; + long counterValue = 0; + + for (String groupName : vertexCounters.getGroupNames()) { + if (groupName.equals(groupNamePattern)) { + tezCounter = vertexCounters.getGroup(groupName).findCounter(counterName); + counterValue += (tezCounter != null) ? tezCounter.getValue() : 0; + } + } + + return counterValue; + } + + private void printMethodsSummary() { + double totalInPrepTime = 0; + + String[] perfLoggerReportMethods = { + (PerfLogger.PARSE), + (PerfLogger.ANALYZE), + (PerfLogger.TEZ_BUILD_DAG), + (PerfLogger.TEZ_SUBMIT_TO_RUNNING) + }; + + /* Build the method summary header */ + String methodBreakdownHeader = String.format("%-30s %-13s", METHOD, DURATION); + console.printInfo(methodBreakdownHeader); + + for (String method : perfLoggerReportMethods) { + long duration = perfLogger.getDuration(method); + totalInPrepTime += duration; + console.printInfo(String.format("%-30s %11s", method, msDurationFormatter.format(duration))); + } + + /* + * The counters list above don't capture the total time from TimeToSubmit.startTime till + * TezRunDag.startTime, so calculate the duration and print it. + */ + totalInPrepTime = + perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) + - perfLogger.getStartTime(PerfLogger.TIME_TO_SUBMIT); + + console.printInfo(String.format("%-30s %11s", TOTAL_PREP_TIME, + msDurationFormatter.format(totalInPrepTime))); + + console.printInfo("\n"); + } + + private void printDagSummary(Map progressMap, LogHelper console, + DAGClient dagClient, HiveConf conf, JobConf jobConf, DAG dag) throws IOException, + TezException { + + VertexStatus vertexStatus; + TezCounters vertexCounters; + TezCounter tezCounter; + double cpuTimeMillis = 0; + double gcTimeMillis = 0; + double hiveInputRecords = 0; + double hiveOutputRecords = 0; + double hiveOutputIntermediateRecords = 0; + double hiveInputRecordsFromOtherVertices = 0; + double duration = 0; + + int totalTasks = 0; + int failedTasks = 0; + int killedTasks = 0; + + /* Strings for headers and counters */ + String hiveCountersGroup = conf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); + String vertexExecutionStats; + + /* Build the header for the per vertex summary */ + String vertexSummaryHeader = + String.format("%-12s %-12s %-12s %-15s %-20s %-15s %-15s %-15s %-15s", VERTEX_NAME, + TOTAL_TASKS, FAILED_TASKS, KILLED_TASKS, ELAPSED_TIME, CPU_TIME, GC_TIME, + INPUT_RECORDS, OUTPUT_RECORDS); + + /* Print the per Vertex summary */ + console.printInfo(vertexSummaryHeader); + + Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); + TezCounters hiveCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters(); + + /* If the counters are missing there is no point trying to print progress */ + if (hiveCounters == null) { + return; + } + + SortedSet keys = new TreeSet(progressMap.keySet()); + + Set statusOptions = new HashSet(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + + for (String vertexName : keys) { + Progress progress = progressMap.get(vertexName); + + /* Be extra careful not to hit NPE */ + if (progress == null) { + continue; + } + + totalTasks = progress.getTotalTaskCount(); + failedTasks = progress.getFailedTaskCount(); + killedTasks = progress.getKilledTaskCount(); + hiveInputRecordsFromOtherVertices = 0; + + duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName) / 1000.0; + + vertexStatus = dagClient.getVertexStatus(vertexName, statusOptions); + + if (vertexStatus == null) { + continue; + } + + Vertex currentVertex = dag.getVertex(vertexName); + List inputVerticesList = currentVertex.getInputVertices(); + if (inputVerticesList.size() > 0) { + + for (Vertex inputVertex : inputVerticesList) { + + String inputVertexName = inputVertex.getName(); + + hiveInputRecordsFromOtherVertices += + getCounterValueByGroupName( + hiveCounters, + hiveCountersGroup, + String.format("%s_", + ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()) + + inputVertexName.replace(" ", "_")); + + hiveInputRecordsFromOtherVertices += + getCounterValueByGroupName(hiveCounters, hiveCountersGroup, + String.format("%s_", FileSinkOperator.Counter.RECORDS_OUT.toString()) + + inputVertexName.replace(" ", "_")); + } + } + + /* + * Get the CPU & GC + * + * counters org.apache.tez.common.counters.TaskCounter + * GC_TIME_MILLIS=37712 + * CPU_MILLISECONDS=2774230 + */ + vertexCounters = vertexStatus.getVertexCounters(); + + cpuTimeMillis = + getCounterValueByGroupName(vertexCounters, TaskCounter.class.getName(), + TaskCounter.GC_TIME_MILLIS.name()); + + gcTimeMillis = + getCounterValueByGroupName(vertexCounters, TaskCounter.class.getName(), + TaskCounter.GC_TIME_MILLIS.name()); + + /* + * Get the HIVE counters + * + * HIVE + * CREATED_FILES=1 + * DESERIALIZE_ERRORS=0 + * RECORDS_IN_Map_1=550076554 + * RECORDS_OUT_INTERMEDIATE_Map_1=854987 + * RECORDS_OUT_Reducer_2=1 + */ + hiveInputRecords = + getCounterValueByGroupName( + hiveCounters, + hiveCountersGroup, + String.format("%s_", MapOperator.Counter.RECORDS_IN.toString()) + + vertexName.replace(" ", "_")); + + hiveInputRecords += hiveInputRecordsFromOtherVertices; + + hiveOutputRecords = + getCounterValueByGroupName( + hiveCounters, + hiveCountersGroup, + String.format("%s_", FileSinkOperator.Counter.RECORDS_OUT.toString()) + + vertexName.replace(" ", "_")); + + hiveOutputIntermediateRecords = + getCounterValueByGroupName(hiveCounters, hiveCountersGroup, + String.format("%s_", ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()) + + vertexName.replace(" ", "_")); + + hiveOutputRecords += hiveOutputIntermediateRecords; + + vertexExecutionStats = + String.format("%-12s %11s %13s %12s %19s %19s %13s %15s %16s", vertexName, + Long.toString(totalTasks), Long.toString(failedTasks), Long.toString(killedTasks), + secondDurationFormatter.format((duration)), decimalFormatter.format(cpuTimeMillis), + decimalFormatter.format(gcTimeMillis), decimalFormatter.format(hiveInputRecords), + decimalFormatter.format(hiveOutputRecords)); + + console.printInfo(vertexExecutionStats); + } + } + private String printStatus(Map progressMap, String lastReport, LogHelper console) { StringBuffer reportBuffer = new StringBuffer(); + StringBuffer runningVertices = new StringBuffer(); + int progressPercentage = 0; + int sumComplete = 0; + int sumTotal = 0; + + int runningVerticesCount = 0; + int initializingVerticesCount = 0; SortedSet keys = new TreeSet(progressMap.keySet()); - for (String s: keys) { + for (String s : keys) { Progress progress = progressMap.get(s); final int complete = progress.getSucceededTaskCount(); final int total = progress.getTotalTaskCount(); final int running = progress.getRunningTaskCount(); final int failed = progress.getFailedTaskCount(); + sumComplete += complete; + sumTotal += total; + if (total <= 0) { - reportBuffer.append(String.format("%s: -/-\t", s, complete, total)); + initializingVerticesCount++; } else { if (complete == total && !completed.contains(s)) { + completed.add(s); + + /* We may have missed the start of the vertex + * due to the 3 seconds interval + */ + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); } - if(complete < total && (complete > 0 || running > 0 || failed > 0)) { + + if (complete < total && (complete > 0 || running > 0 || failed > 0)) { + + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + + runningVerticesCount++; + /* vertex is started, but not complete */ if (failed > 0) { - reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total)); + runningVertices.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, + failed, total)); } else { - reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total)); + runningVertices.append(String.format(String.format("%s: %d(+%d)/%d\t", s, complete, + running, total))); } + } else { - /* vertex is waiting for input/slots or complete */ + if (failed > 0) { /* tasks finished but some failed */ - reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total)); - } else { - reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total)); + runningVertices.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total)); } } } } + progressPercentage = (sumTotal == 0) ? 0 : 100 * sumComplete / sumTotal; + + String verticesSummary = + getAggregatedVerticesStatus(keys.size(), completed.size(), runningVerticesCount); + + String progressBar = getProgressBar(progressPercentage); + + reportBuffer.append(String.format("%-12s %-12s ", progressBar, verticesSummary)); + + if (runningVertices.length() > 0) { + reportBuffer.append(String.format("%s: ", "RUNNING")).append(runningVertices); + } + String report = reportBuffer.toString(); if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) { console.printInfo(report); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 93e0fac..a538fe2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -161,7 +161,7 @@ public int execute(DriverContext driverContext) { // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(); - rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf); + rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf, jobConf, dag); // fetch the counters Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); diff --git ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java index c4327fc..0325da6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java +++ ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java @@ -147,10 +147,37 @@ public void close(Log _log, QueryPlan queryPlan) { } public Long getStartTime(String method) { - return startTimes.get(method); + Long startTime = 0L; + + if (startTimes.containsKey(method)) { + startTime = startTimes.get(method); + } + return startTime; } public Long getEndTime(String method) { - return endTimes.get(method); + Long endTime = 0L; + + if (endTimes.containsKey(method)) { + endTime = endTimes.get(method); + } + return endTime; } + + public boolean startTimeHasMethod(String method) { + return startTimes.containsKey(method); + } + + public boolean endTimeHasMethod(String method) { + return endTimes.containsKey(method); + } + + public Long getDuration(String method) { + long duration = 0; + if (startTimes.containsKey(method) && endTimes.containsKey(method)) { + duration = endTimes.get(method) - startTimes.get(method); + } + return duration; + } + }