diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 422c6b2..94c875b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -178,6 +178,7 @@ public class HiveConf extends Configuration { PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), ONFAILUREHOOKS("hive.exec.failure.hooks", ""), + OPERATORHOOKS("hive.exec.operator.hooks", ""), CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""), EXECPARALLEL("hive.exec.parallel", false), // parallel query launching EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java index 9a8c237..86f2feb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java @@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.net.URLClassLoader; import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -50,6 +51,7 @@ public class ExecMapper extends MapReduceBase implements Mapper { private JobConf jc; private boolean abort = false; private Reporter rp; + private List opHooks; public static final Log l4j = LogFactory.getLog("ExecMapper"); private static boolean done; @@ -96,6 +98,7 @@ public class ExecMapper extends MapReduceBase implements Mapper { mo.setExecContext(execContext); mo.initializeLocalWork(jc); mo.initialize(jc, null); + opHooks = OperatorHookUtils.getOperatorHooks(jc); if (localWork == null) { return; @@ -130,6 +133,7 @@ public class ExecMapper extends MapReduceBase implements Mapper { rp = reporter; mo.setOutputCollector(oc); mo.setReporter(rp); + mo.setOperatorHooks(opHooks); } // reset the execContext for each new row execContext.resetRow(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java index 18a9bd2..e63b5ce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java @@ -25,6 +25,7 @@ import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +66,7 @@ public class ExecReducer extends MapReduceBase implements Reducer { private long nextCntr = 1; private static String[] fieldNames; + private List opHooks; public static final Log l4j = LogFactory.getLog("ExecReducer"); private boolean isLogInfoEnabled = false; @@ -148,6 +150,7 @@ public class ExecReducer extends MapReduceBase implements Reducer { try { l4j.info(reducer.dump(0)); reducer.initialize(jc, rowObjectInspector); + opHooks = OperatorHookUtils.getOperatorHooks(jc); } catch (Throwable e) { abort = true; if (e instanceof OutOfMemoryError) { @@ -178,6 +181,7 @@ public class ExecReducer extends MapReduceBase implements Reducer { rp = reporter; reducer.setOutputCollector(oc); reducer.setReporter(rp); + reducer.setOperatorHooks(opHooks); } try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 68302f8..07f4460 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -53,6 +53,7 @@ public abstract class Operator implements Serializable,C // Bean methods private static final long serialVersionUID = 1L; + List operatorHooks; protected List> childOperators; protected List> parentOperators; @@ -226,6 +227,17 @@ public abstract class Operator implements Serializable,C return id; } + public void setOperatorHooks(List opHooks){ + operatorHooks = opHooks; + if (childOperators == null) { + return; + } + + for (Operator op : childOperators) { + op.setOperatorHooks(opHooks); + } + } + public void setReporter(Reporter rep) { reporter = rep; @@ -409,6 +421,24 @@ public abstract class Operator implements Serializable,C } } + private void enterOperatorHooks() throws HiveException { + if (this.operatorHooks == null) { + return; + } + for(OperatorHook opHook : this.operatorHooks) { + opHook.enter(); + } + } + + private void exitOperatorHooks() throws HiveException { + if (this.operatorHooks == null) { + return; + } + for(OperatorHook opHook : this.operatorHooks) { + opHook.exit(); + } + } + /** * Collects all the parent's output object inspectors and calls actual * initialization method. @@ -471,7 +501,9 @@ public abstract class Operator implements Serializable,C return; } preProcessCounter(); + enterOperatorHooks(); processOp(row, tag); + exitOperatorHooks(); postProcessCounter(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java new file mode 100644 index 0000000..3ee8d28 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java @@ -0,0 +1,12 @@ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +/** + * OperatorHook is a new interface which + * executes pre/post each Operator execution in a map/reduce stage + */ + +public interface OperatorHook { + public void enter() throws HiveException; + public void exit() throws HiveException; +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java new file mode 100644 index 0000000..c62e86e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java @@ -0,0 +1,35 @@ + +package org.apache.hadoop.hive.ql.exec; +import java.util.List; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +public class OperatorHookUtils { + + public static final Log LOG = LogFactory.getLog("OperatorHookUtils"); + public static List getOperatorHooks(Configuration hconf) + throws Exception { + List opHooks = new ArrayList(); + String csOpHooks = HiveConf.getVar(hconf, HiveConf.ConfVars.OPERATORHOOKS); + if(csOpHooks == null) { + return opHooks; + } + String[] opHookClasses = csOpHooks.split(","); + for(String opHookClass: opHookClasses) { + try { + + OperatorHook opHook = + (OperatorHook) Class.forName(opHookClass.trim(), true, + JavaUtils.getClassLoader()).newInstance(); + opHooks.add(opHook); + } catch (ClassNotFoundException e) { + LOG.error(opHookClass + " Class not found: " + e.getMessage()); + } + } + return opHooks; + } +}