Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1178612) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.MapRedStats; +import org.apache.hadoop.hive.ql.MapRedStats.TaskCounter; import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution; import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor; @@ -392,36 +393,14 @@ Counter ctr; - ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_SHUFFLE_BYTES"); - if (ctr != null) { - mapRedStats.setReduceShuffleBytes(ctr.getValue()); + for (TaskCounter counter : TaskCounter.values()) { + ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", + counter.name()); + if (ctr != null) { + mapRedStats.setTaskCounterValue(counter, ctr.getValue()); + } } - ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", - "MAP_INPUT_RECORDS"); - if (ctr != null) { - mapRedStats.setMapInputRecords(ctr.getValue()); - } - - ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", - "MAP_OUTPUT_RECORDS"); - if (ctr != null) { - mapRedStats.setMapOutputRecords(ctr.getValue()); - } - - ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_INPUT_RECORDS"); - if (ctr != null) { - mapRedStats.setReduceInputRecords(ctr.getValue()); - } - - ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS"); - if (ctr != null) { - mapRedStats.setReduceOutputRecords(ctr.getValue()); - } - ctr = ctrs.findCounter("FileSystemCounters", "HDFS_BYTES_READ"); if (ctr != null) { Index: ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java (revision 1178612) +++ ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java (working copy) @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql; +import java.util.HashMap; +import java.util.Map; + /** * MapRedStats. * @@ -32,13 +35,44 @@ long cpuMSec; long hdfsRead = -1; long hdfsWrite = -1; - long mapInputRecords = -1; - long mapOutputRecords = -1; - long reduceInputRecords = -1; - long reduceOutputRecords = -1; - long reduceShuffleBytes = -1; + Map taskCounters = new HashMap(); boolean success; + // This should correspond to a subset of org.apache.hadoop.mapred.Task.Counter, in the current + // version of Hadoop, Task is package private and hence inaccessible + public enum TaskCounter { + MAP_INPUT_RECORDS, + MAP_OUTPUT_RECORDS, + MAP_SKIPPED_RECORDS, + MAP_INPUT_BYTES, + MAP_OUTPUT_BYTES, + MAP_SPILL_CPU, + MAP_SPILL_WALLCLOCK, + MAP_SPILL_NUMBER, + MAP_SPILL_BYTES, + MAP_MEM_SORT_CPU, + MAP_MEM_SORT_WALLCLOCK, + MAP_MERGE_CPU, + MAP_MERGE_WALLCLOCK, + COMBINE_INPUT_RECORDS, + COMBINE_OUTPUT_RECORDS, + REDUCE_INPUT_GROUPS, + REDUCE_SHUFFLE_BYTES, + REDUCE_COPY_WALLCLOCK, + REDUCE_COPY_CPU, + REDUCE_SORT_WALLCLOCK, + REDUCE_SORT_CPU, + REDUCE_INPUT_RECORDS, + REDUCE_OUTPUT_RECORDS, + REDUCE_SKIPPED_GROUPS, + REDUCE_SKIPPED_RECORDS, + MAP_TASK_WALLCLOCK, + REDUCE_TASK_WALLCLOCK, + SPILLED_RECORDS, + PHYSICAL_MEMORY_BYTES, + VIRTUAL_MEMORY_BYTES + } + String jobId; public MapRedStats(int numMap, int numReduce, long cpuMSec, boolean ifSuccess, String jobId) { @@ -89,46 +123,20 @@ this.hdfsWrite = hdfsWrite; } - public long getMapInputRecords() { - return mapInputRecords; + // Returns the value of the counter from the last call to setTaskCounterValue with that counter + // Returns -1 if that counter is not in the map + public long getTaskCounterValue(TaskCounter counter) { + if (this.taskCounters.containsKey(counter)) { + return this.taskCounters.get(counter); + } else { + return -1; + } } - public void setMapInputRecords(long mapInputRecords) { - this.mapInputRecords = mapInputRecords; + public void setTaskCounterValue(TaskCounter counter, long value) { + this.taskCounters.put(counter, value); } - public long getMapOutputRecords() { - return mapOutputRecords; - } - - public void setMapOutputRecords(long mapOutputRecords) { - this.mapOutputRecords = mapOutputRecords; - } - - public long getReduceInputRecords() { - return reduceInputRecords; - } - - public void setReduceInputRecords(long reduceInputRecords) { - this.reduceInputRecords = reduceInputRecords; - } - - public long getReduceOutputRecords() { - return reduceOutputRecords; - } - - public void setReduceOutputRecords(long reduceOutputRecords) { - this.reduceOutputRecords = reduceOutputRecords; - } - - public long getReduceShuffleBytes() { - return reduceShuffleBytes; - } - - public void setReduceShuffleBytes(long reduceShuffleBytes) { - this.reduceShuffleBytes = reduceShuffleBytes; - } - public void setCpuMSec(long cpuMSec) { this.cpuMSec = cpuMSec; }