diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java index 1400be4..6982fb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java @@ -11,6 +11,7 @@ import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; @@ -59,24 +60,58 @@ this.hiveCounters = hiveCounters(dagClient); } + private long hiveInputRecordsFromTezCounters(String vertexName, String inputVertexName) { + // Get the counters for the input vertex. + Set statusOptions = new HashSet<>(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + VertexStatus inputVertexStatus = vertexStatus(statusOptions, inputVertexName); + final TezCounters inputVertexCounters = inputVertexStatus.getVertexCounters(); + + // eg, group name TaskCounter_Map_7_OUTPUT_Reducer_8, counter name OUTPUT_RECORDS + String groupName = formattedName("TaskCounter", inputVertexName, vertexName); + String counterName = "OUTPUT_RECORDS"; + + // Do not create counter if it does not exist - + // instead fall back to default behavior for determining input records. + TezCounter tezCounter = inputVertexCounters.getGroup(groupName).findCounter(counterName, false); + if (tezCounter == null) { + return -1; + } else { + return tezCounter.getValue(); + } + } + + private long hiveInputRecordsFromHiveCounters(String inputVertexName) { + // The record count from these counters may not be correct if the input vertex has + // edges to more than one vertex, since this value counts the records going to all + // destination vertices. + + String intermediateRecordsCounterName = formattedName( + ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(), + inputVertexName + ); + String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(), + inputVertexName); + return hiveCounterValue(intermediateRecordsCounterName) + hiveCounterValue(recordsOutCounterName); + } + private long hiveInputRecordsFromOtherVertices(String vertexName) { List inputVerticesList = dag.getVertex(vertexName).getInputVertices(); long result = 0; for (Vertex inputVertex : inputVerticesList) { - String intermediateRecordsCounterName = formattedName( - ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(), - inputVertex.getName() - ); - String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(), - inputVertex.getName()); - result += ( - hiveCounterValue(intermediateRecordsCounterName) - + hiveCounterValue(recordsOutCounterName) - ); + long inputVertexRecords = hiveInputRecordsFromTezCounters(vertexName, inputVertex.getName()); + if (inputVertexRecords < 0) { + inputVertexRecords = hiveInputRecordsFromHiveCounters(inputVertex.getName()); + } + result += inputVertexRecords; } return result; } + private String formattedName(String counterName, String srcVertexName, String destVertexName) { + return String.format("%s_", counterName) + srcVertexName.replace(" ", "_") + "_OUTPUT_" + destVertexName.replace(" ", "_"); + } + private String formattedName(String counterName, String vertexName) { return String.format("%s_", counterName) + vertexName.replace(" ", "_"); }