diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java index 1400be4..140e208 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java @@ -63,20 +63,24 @@ private long hiveInputRecordsFromOtherVertices(String vertexName) { List inputVerticesList = dag.getVertex(vertexName).getInputVertices(); long result = 0; for (Vertex inputVertex : inputVerticesList) { - String intermediateRecordsCounterName = formattedName( - ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(), - inputVertex.getName() - ); - String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(), - inputVertex.getName()); - result += ( - hiveCounterValue(intermediateRecordsCounterName) - + hiveCounterValue(recordsOutCounterName) - ); + // Get the counters for the input vertex. + Set statusOptions = new HashSet<>(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + VertexStatus inputVertexStatus = vertexStatus(statusOptions, inputVertex.getName()); + final TezCounters inputVertexCounters = inputVertexStatus.getVertexCounters(); + + // eg, group name TaskCounter_Map_7_OUTPUT_Reducer_8, counter name OUTPUT_RECORDS + String groupName = formattedName("TaskCounter", inputVertex.getName(), vertexName); + String counterName = "OUTPUT_RECORDS"; + result += getCounterValueByGroupName(inputVertexCounters, groupName, counterName); } return result; } + private String formattedName(String counterName, String srcVertexName, String destVertexName) { + return String.format("%s_", counterName) + srcVertexName.replace(" ", "_") + "_OUTPUT_" + destVertexName.replace(" ", "_"); + } + private String formattedName(String counterName, String vertexName) { return String.format("%s_", counterName) + vertexName.replace(" ", "_"); }