diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 284acbc..7cec650 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -83,8 +83,8 @@ "VERTICES", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED"); // method and dag summary format - private static final String SUMMARY_HEADER_FORMAT = "%-16s %-12s %-12s %-15s %-20s %-15s %-15s %-15s %-15s"; - private static final String SUMMARY_VERTEX_FORMAT = "%-12s %11s %13s %12s %19s %19s %13s %15s %16s"; + private static final String SUMMARY_HEADER_FORMAT = "%-16s %-12s %-12s %-12s %-19s %-19s %-15s %-15s %-15s"; + private static final String SUMMARY_VERTEX_FORMAT = "%-16s %11s %16s %12s %16s %18s %18s %14s %16s"; private static final String SUMMARY_HEADER = String.format(SUMMARY_HEADER_FORMAT, "VERTICES", "TOTAL_TASKS", "FAILED_ATTEMPTS", "KILLED_TASKS", "DURATION_SECONDS", "CPU_TIME_MILLIS", "GC_TIME_MILLIS", "INPUT_RECORDS", "OUTPUT_RECORDS"); @@ -539,12 +539,27 @@ private void printDagSummary(Map progressMap, LogHelper consol * RECORDS_OUT_INTERMEDIATE_Map_1=854987 * RECORDS_OUT_Reducer_2=1 */ - final long hiveInputRecords = getCounterValueByGroupName(hiveCounters, hiveCountersGroup, - MapOperator.Counter.RECORDS_IN.toString()) + hiveInputRecordsFromOtherVertices; - final long hiveOutputIntermediateRecords = getCounterValueByGroupName(hiveCounters, - hiveCountersGroup, ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()); - final long hiveOutputRecords = getCounterValueByGroupName(hiveCounters, hiveCountersGroup, - FileSinkOperator.Counter.RECORDS_OUT.toString()) + hiveOutputIntermediateRecords; + + final long hiveInputRecords = + getCounterValueByGroupName( + hiveCounters, + hiveCountersGroup, + String.format("%s_", MapOperator.Counter.RECORDS_IN.toString()) + + vertexName.replace(" ", "_")) + + hiveInputRecordsFromOtherVertices; + final long hiveOutputIntermediateRecords = + getCounterValueByGroupName( + hiveCounters, + hiveCountersGroup, + String.format("%s_", ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()) + + vertexName.replace(" ", "_")); + final long hiveOutputRecords = + getCounterValueByGroupName( + hiveCounters, + hiveCountersGroup, + String.format("%s_", FileSinkOperator.Counter.RECORDS_OUT.toString()) + + vertexName.replace(" ", "_")) + + hiveOutputIntermediateRecords; String vertexExecutionStats = String.format(SUMMARY_VERTEX_FORMAT, vertexName, @@ -741,9 +756,22 @@ private String getReport(Map progressMap) { } else { if (complete == total && !completed.contains(s)) { completed.add(s); + + /* + * We may have missed the start of the vertex due to the 3 seconds interval + */ + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); } if(complete < total && (complete > 0 || running > 0 || failed > 0)) { + + if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s); + } + /* vertex is started, but not complete */ if (failed > 0) { reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));