Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1178612)
+++ conf/hive-default.xml (working copy)
@@ -1211,4 +1211,12 @@
String used as a file extension for output files. If not set, defaults to the codec extension for text files (e.g. ".gz"), or no extension otherwise.
+
+ hive.task.counters.enum
+ org.apache.hadoop.hive.ql.MapRedStats$TaskCounter
+ Class name of an enum containing the counters we want to log as part of the
+ MapRedStats class. The names of the values of this enum should be a subset of those in
+ org.apache.hadoop.mapred.Task$Counter
+
+
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1178612)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -482,6 +482,8 @@
HIVE_PERF_LOGGER("hive.exec.perf.logger", "org.apache.hadoop.hive.ql.log.PerfLogger"),
// Whether to delete the scratchdir while startup
HIVE_START_CLEANUP_SCRATCHDIR("hive.start.cleanup.scratchdir", false),
+
+ HIVE_TASK_COUNTERS_ENUM("hive.task.counters.enum", "org.apache.hadoop.hive.ql.MapRedStats$TaskCounter");
;
public final String varname;
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1178612)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy)
@@ -20,6 +20,8 @@
import java.io.IOException;
import java.io.Serializable;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
@@ -35,7 +37,9 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.MapRedStats;
+import org.apache.hadoop.hive.ql.MapRedStats.TaskCounter;
import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
@@ -51,6 +55,7 @@
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.FileAppender;
@@ -392,36 +397,22 @@
Counter ctr;
- ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
- "REDUCE_SHUFFLE_BYTES");
- if (ctr != null) {
- mapRedStats.setReduceShuffleBytes(ctr.getValue());
+ Class extends Enum> taskCounterEnum;
+ try {
+ taskCounterEnum = (Class extends Enum>) job.getClassByName(HiveConf.getVar(job, HiveConf.ConfVars.HIVE_TASK_COUNTERS_ENUM));
+ } catch (ClassNotFoundException e) {
+ LOG.error("Hive Task Counter Enum not found:" + HiveConf.getVar(job, HiveConf.ConfVars.HIVE_TASK_COUNTERS_ENUM));
+ taskCounterEnum = TaskCounter.class;
}
- ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
- "MAP_INPUT_RECORDS");
- if (ctr != null) {
- mapRedStats.setMapInputRecords(ctr.getValue());
+ for (Enum counter : taskCounterEnum.getEnumConstants()) {
+ ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
+ counter.name());
+ if (ctr != null) {
+ mapRedStats.setTaskCounterValue(counter, ctr.getValue());
+ }
}
- ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
- "MAP_OUTPUT_RECORDS");
- if (ctr != null) {
- mapRedStats.setMapOutputRecords(ctr.getValue());
- }
-
- ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
- "REDUCE_INPUT_RECORDS");
- if (ctr != null) {
- mapRedStats.setReduceInputRecords(ctr.getValue());
- }
-
- ctr = ctrs.findCounter("org.apache.hadoop.mapred.Task$Counter",
- "REDUCE_OUTPUT_RECORDS");
- if (ctr != null) {
- mapRedStats.setReduceOutputRecords(ctr.getValue());
- }
-
ctr = ctrs.findCounter("FileSystemCounters",
"HDFS_BYTES_READ");
if (ctr != null) {
Index: ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java (revision 1178612)
+++ ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java (working copy)
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.ql;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* MapRedStats.
*
@@ -32,13 +35,19 @@
long cpuMSec;
long hdfsRead = -1;
long hdfsWrite = -1;
- long mapInputRecords = -1;
- long mapOutputRecords = -1;
- long reduceInputRecords = -1;
- long reduceOutputRecords = -1;
- long reduceShuffleBytes = -1;
+ Map taskCounters = new HashMap();
boolean success;
+ // This should correspond to a subset of org.apache.hadoop.mapred.Task.Counter, in the current
+ // version of Hadoop, Task is package private and hence inaccessible
+ public enum TaskCounter {
+ MAP_INPUT_RECORDS,
+ MAP_OUTPUT_RECORDS,
+ REDUCE_SHUFFLE_BYTES,
+ REDUCE_INPUT_RECORDS,
+ REDUCE_OUTPUT_RECORDS,
+ }
+
String jobId;
public MapRedStats(int numMap, int numReduce, long cpuMSec, boolean ifSuccess, String jobId) {
@@ -89,46 +98,20 @@
this.hdfsWrite = hdfsWrite;
}
- public long getMapInputRecords() {
- return mapInputRecords;
+ // Returns the value of the counter from the last call to setTaskCounterValue with that counter
+ // Returns -1 if that counter is not in the map
+ public long getTaskCounterValue(Enum counter) {
+ if (this.taskCounters.containsKey(counter)) {
+ return this.taskCounters.get(counter);
+ } else {
+ return -1;
+ }
}
- public void setMapInputRecords(long mapInputRecords) {
- this.mapInputRecords = mapInputRecords;
+ public void setTaskCounterValue(Enum counter, long value) {
+ this.taskCounters.put(counter, value);
}
- public long getMapOutputRecords() {
- return mapOutputRecords;
- }
-
- public void setMapOutputRecords(long mapOutputRecords) {
- this.mapOutputRecords = mapOutputRecords;
- }
-
- public long getReduceInputRecords() {
- return reduceInputRecords;
- }
-
- public void setReduceInputRecords(long reduceInputRecords) {
- this.reduceInputRecords = reduceInputRecords;
- }
-
- public long getReduceOutputRecords() {
- return reduceOutputRecords;
- }
-
- public void setReduceOutputRecords(long reduceOutputRecords) {
- this.reduceOutputRecords = reduceOutputRecords;
- }
-
- public long getReduceShuffleBytes() {
- return reduceShuffleBytes;
- }
-
- public void setReduceShuffleBytes(long reduceShuffleBytes) {
- this.reduceShuffleBytes = reduceShuffleBytes;
- }
-
public void setCpuMSec(long cpuMSec) {
this.cpuMSec = cpuMSec;
}