diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 418a03e..243eed8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -73,7 +73,7 @@ private static final int COLUMN_1_WIDTH = 16; private static final int SEPARATOR_WIDTH = InPlaceUpdates.MIN_TERMINAL_WIDTH; private static final String SEPARATOR = new String(new char[SEPARATOR_WIDTH]).replace("\0", "-"); - private static final String PREP_SUMMARY_HEADER = "DAG Preparation Summary"; + private static final String QUERY_EXEC_SUMMARY_HEADER = "Query Execution Summary"; private static final String TASK_SUMMARY_HEADER = "Task Execution Summary"; private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary"; @@ -95,9 +95,11 @@ private static final String LLAP_SUMMARY_HEADER = String.format(LLAP_SUMMARY_HEADER_FORMAT, "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS", "ALLOCATION", "USED", "TOTAL_IO"); - private static final String TOTAL_PREP_TIME = "TotalPrepTime"; - private static final String METHOD = "METHOD"; - private static final String DURATION = "DURATION(ms)"; + + // Methods summary + private static final String OPERATION_SUMMARY = "%-35s %9s"; + private static final String OPERATION = "OPERATION"; + private static final String DURATION = "DURATION"; // in-place progress update related variables private int lines; @@ -214,6 +216,7 @@ public int monitorExecution(final DAGClient dagClient, HiveConf conf, boolean running = false; boolean done = false; + boolean success = false; int failedCounter = 0; int rc = 0; DAGStatus.State lastState = null; @@ -231,12 +234,12 @@ public int monitorExecution(final DAGClient dagClient, HiveConf conf, console.printInfo("\n"); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING); - + Map progressMap = null; while (true) { try { status = dagClient.getDAGStatus(opts, checkInterval); - Map progressMap = status.getVertexProgress(); + progressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); if (state != lastState || state == RUNNING) { @@ -277,35 +280,7 @@ public int monitorExecution(final DAGClient dagClient, HiveConf conf, } else { lastReport = printStatus(progressMap, lastReport, console); } - - /* Profile info is collected anyways, isProfileEnabled - * decides if it gets printed or not - */ - if (isProfileEnabled) { - - double duration = (System.currentTimeMillis() - startTime) / 1000.0; - console.printInfo("Status: DAG finished successfully in " - + String.format("%.2f seconds", duration)); - console.printInfo("\n"); - - console.printInfo(PREP_SUMMARY_HEADER); - printMethodsSummary(); - console.printInfo(SEPARATOR); - console.printInfo(""); - - console.printInfo(TASK_SUMMARY_HEADER); - printDagSummary(progressMap, console, dagClient, conf, dag); - console.printInfo(SEPARATOR); - console.printInfo(""); - - if (llapIoEnabled) { - console.printInfo(LLAP_IO_SUMMARY_HEADER); - printLlapIOSummary(progressMap, console, dagClient); - console.printInfo(SEPARATOR); - } - - console.printInfo("\n"); - } + success = true; running = false; done = true; break; @@ -376,6 +351,33 @@ public int monitorExecution(final DAGClient dagClient, HiveConf conf, } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); + + if (isProfileEnabled && success && progressMap != null) { + + double duration = (System.currentTimeMillis() - startTime) / 1000.0; + console.printInfo("Status: DAG finished successfully in " + + String.format("%.2f seconds", duration)); + console.printInfo("\n"); + + console.printInfo(QUERY_EXEC_SUMMARY_HEADER); + printQueryExecutionBreakDown(); + console.printInfo(SEPARATOR); + console.printInfo(""); + + console.printInfo(TASK_SUMMARY_HEADER); + printDagSummary(progressMap, console, dagClient, conf, dag); + console.printInfo(SEPARATOR); + console.printInfo(""); + + if (llapIoEnabled) { + console.printInfo(LLAP_IO_SUMMARY_HEADER); + printLlapIOSummary(progressMap, console, dagClient); + console.printInfo(SEPARATOR); + } + + console.printInfo("\n"); + } + return rc; } @@ -414,37 +416,42 @@ private static long getCounterValueByGroupName(TezCounters vertexCounters, return (tezCounter == null) ? 0 : tezCounter.getValue(); } - private void printMethodsSummary() { - long totalInPrepTime = 0; - - String[] perfLoggerReportMethods = { - (PerfLogger.PARSE), - (PerfLogger.ANALYZE), - (PerfLogger.TEZ_BUILD_DAG), - (PerfLogger.TEZ_SUBMIT_TO_RUNNING) - }; + private void printQueryExecutionBreakDown() { /* Build the method summary header */ - String methodBreakdownHeader = String.format("%-30s %-13s", METHOD, DURATION); + String execBreakdownHeader = String.format(OPERATION_SUMMARY, OPERATION, DURATION); console.printInfo(SEPARATOR); - reprintLineWithColorAsBold(methodBreakdownHeader, Ansi.Color.CYAN); + reprintLineWithColorAsBold(execBreakdownHeader, Ansi.Color.CYAN); console.printInfo(SEPARATOR); - for (String method : perfLoggerReportMethods) { - long duration = perfLogger.getDuration(method); - totalInPrepTime += duration; - console.printInfo(String.format("%-30s %11s", method, commaFormat.format(duration))); - } + // parse, analyze, optimize and compile + long compile = perfLogger.getDuration(PerfLogger.COMPILE); + console.printInfo(String.format(OPERATION_SUMMARY, "Compile Query", + secondsFormat.format(compile / 1000.0) + "s")); - /* - * The counters list above don't capture the total time from TimeToSubmit.startTime till - * TezRunDag.startTime, so calculate the duration and print it. - */ - totalInPrepTime = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) - + // total preparation time. query submit to dag submit + long totalDAGPrep = perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG) - perfLogger.getStartTime(PerfLogger.TIME_TO_SUBMIT); + console.printInfo(String.format(OPERATION_SUMMARY, "Prepare Plan", + secondsFormat.format(totalDAGPrep / 1000.0) + "s")); + + // submit to accept dag (if session is closed, this will include re-opening of session time) + long submitToAccept = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) - + perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG); + console.printInfo(String.format(OPERATION_SUMMARY, "Submit Plan", + secondsFormat.format(submitToAccept / 1000.0) + "s")); + + // accept to start dag + long acceptToStart = perfLogger.getDuration(PerfLogger.TEZ_SUBMIT_TO_RUNNING); + console.printInfo(String.format(OPERATION_SUMMARY, "Start", + secondsFormat.format(acceptToStart / 1000.0) + "s")); + + // time to actually run the dag + long startToEnd = perfLogger.getEndTime(PerfLogger.TEZ_RUN_DAG) - + perfLogger.getEndTime(PerfLogger.TEZ_SUBMIT_TO_RUNNING); + console.printInfo(String.format(OPERATION_SUMMARY, "Finish", + secondsFormat.format(startToEnd / 1000.0) + "s")); - console.printInfo(String.format("%-30s %11s", TOTAL_PREP_TIME, commaFormat.format( - totalInPrepTime))); } private void printDagSummary(Map progressMap, LogHelper console, @@ -582,7 +589,7 @@ private String humanReadableByteCount(long bytes) { } private void printLlapIOSummary(Map progressMap, LogHelper console, - DAGClient dagClient) throws Exception { + DAGClient dagClient) { SortedSet keys = new TreeSet<>(progressMap.keySet()); Set statusOptions = new HashSet<>(1); statusOptions.add(StatusGetOpts.GET_COUNTERS); @@ -593,8 +600,15 @@ private void printLlapIOSummary(Map progressMap, LogHelper con if (vertexName.startsWith("Reducer")) { continue; } - TezCounters vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions) - .getVertexCounters(); + TezCounters vertexCounters = null; + try { + vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions) + .getVertexCounters(); + } catch (IOException e) { + // best attempt, shouldn't really kill DAG for this + } catch (TezException e) { + // best attempt, shouldn't really kill DAG for this + } if (vertexCounters != null) { final long selectedRowgroups = getCounterValueByGroupName(vertexCounters, counterGroup, LlapIOCounters.SELECTED_ROWGROUPS.name());