Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1178612) +++ conf/hive-default.xml (working copy) @@ -1211,4 +1211,12 @@ String used as a file extension for output files. If not set, defaults to the codec extension for text files (e.g. ".gz"), or no extension otherwise. + + hive.task.counters.enum + org.apache.hadoop.hive.ql.MapRedStats$TaskCounter + Class name of an enum containing the counters we want to log as part of the + MapRedStats class. The names of the values of this enum should be a subset of those in + org.apache.hadoop.mapred.Task$Counter + + Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1178612) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -482,6 +482,8 @@ HIVE_PERF_LOGGER("hive.exec.perf.logger", "org.apache.hadoop.hive.ql.log.PerfLogger"), // Whether to delete the scratchdir while startup HIVE_START_CLEANUP_SCRATCHDIR("hive.start.cleanup.scratchdir", false), + + HIVE_TASK_COUNTERS_ENUM("hive.task.counters.enum", "org.apache.hadoop.hive.ql.MapRedStats$TaskCounter"); ; public final String varname; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1178612) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy) @@ -20,6 +20,8 @@ import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; @@ -35,7 +37,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.MapRedStats; +import org.apache.hadoop.hive.ql.MapRedStats.TaskCounter; import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution; import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor; @@ -51,6 +55,7 @@ import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapred.TaskReport; import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.log4j.Appender; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.FileAppender; @@ -392,36 +397,22 @@ Counter ctr; - ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_SHUFFLE_BYTES"); - if (ctr != null) { - mapRedStats.setReduceShuffleBytes(ctr.getValue()); + Class taskCounterEnum; + try { + taskCounterEnum = (Class) job.getClassByName(HiveConf.getVar(job, HiveConf.ConfVars.HIVE_TASK_COUNTERS_ENUM)); + } catch (ClassNotFoundException e) { + LOG.error("Hive Task Counter Enum not found:" + HiveConf.getVar(job, HiveConf.ConfVars.HIVE_TASK_COUNTERS_ENUM)); + taskCounterEnum = TaskCounter.class; } - ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", - "MAP_INPUT_RECORDS"); - if (ctr != null) { - mapRedStats.setMapInputRecords(ctr.getValue()); + for (Enum counter : taskCounterEnum.getEnumConstants()) { + ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", + counter.name()); + if (ctr != null) { + mapRedStats.setTaskCounterValue(counter, ctr.getValue()); + } } - ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", - "MAP_OUTPUT_RECORDS"); - if (ctr != null) { - mapRedStats.setMapOutputRecords(ctr.getValue()); - } - - ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_INPUT_RECORDS"); - if (ctr != null) { - mapRedStats.setReduceInputRecords(ctr.getValue()); - } - - ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter", - "REDUCE_OUTPUT_RECORDS"); - if (ctr != null) { - mapRedStats.setReduceOutputRecords(ctr.getValue()); - } - ctr = ctrs.findCounter("FileSystemCounters", "HDFS_BYTES_READ"); if (ctr != null) { Index: ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java (revision 1178612) +++ ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java (working copy) @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql; +import java.util.HashMap; +import java.util.Map; + /** * MapRedStats. * @@ -32,13 +35,19 @@ long cpuMSec; long hdfsRead = -1; long hdfsWrite = -1; - long mapInputRecords = -1; - long mapOutputRecords = -1; - long reduceInputRecords = -1; - long reduceOutputRecords = -1; - long reduceShuffleBytes = -1; + Map taskCounters = new HashMap(); boolean success; + // This should correspond to a subset of org.apache.hadoop.mapred.Task.Counter, in the current + // version of Hadoop, Task is package private and hence inaccessible + public enum TaskCounter { + MAP_INPUT_RECORDS, + MAP_OUTPUT_RECORDS, + REDUCE_SHUFFLE_BYTES, + REDUCE_INPUT_RECORDS, + REDUCE_OUTPUT_RECORDS, + } + String jobId; public MapRedStats(int numMap, int numReduce, long cpuMSec, boolean ifSuccess, String jobId) { @@ -89,46 +98,20 @@ this.hdfsWrite = hdfsWrite; } - public long getMapInputRecords() { - return mapInputRecords; + // Returns the value of the counter from the last call to setTaskCounterValue with that counter + // Returns -1 if that counter is not in the map + public long getTaskCounterValue(Enum counter) { + if (this.taskCounters.containsKey(counter)) { + return this.taskCounters.get(counter); + } else { + return -1; + } } - public void setMapInputRecords(long mapInputRecords) { - this.mapInputRecords = mapInputRecords; + public void setTaskCounterValue(Enum counter, long value) { + this.taskCounters.put(counter, value); } - public long getMapOutputRecords() { - return mapOutputRecords; - } - - public void setMapOutputRecords(long mapOutputRecords) { - this.mapOutputRecords = mapOutputRecords; - } - - public long getReduceInputRecords() { - return reduceInputRecords; - } - - public void setReduceInputRecords(long reduceInputRecords) { - this.reduceInputRecords = reduceInputRecords; - } - - public long getReduceOutputRecords() { - return reduceOutputRecords; - } - - public void setReduceOutputRecords(long reduceOutputRecords) { - this.reduceOutputRecords = reduceOutputRecords; - } - - public long getReduceShuffleBytes() { - return reduceShuffleBytes; - } - - public void setReduceShuffleBytes(long reduceShuffleBytes) { - this.reduceShuffleBytes = reduceShuffleBytes; - } - public void setCpuMSec(long cpuMSec) { this.cpuMSec = cpuMSec; }