diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6bf8219..82e0dc5 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -178,6 +178,7 @@ PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), ONFAILUREHOOKS("hive.exec.failure.hooks", ""), + OPERATORHOOKS("hive.exec.operator.hooks", ""), CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""), EXECPARALLEL("hive.exec.parallel", false), // parallel query launching EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java index 9a8c237..86f2feb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java @@ -23,6 +23,7 @@ import java.lang.management.MemoryMXBean; import java.net.URLClassLoader; import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -50,6 +51,7 @@ private JobConf jc; private boolean abort = false; private Reporter rp; + private List opHooks; public static final Log l4j = LogFactory.getLog("ExecMapper"); private static boolean done; @@ -96,6 +98,7 @@ public void configure(JobConf job) { mo.setExecContext(execContext); mo.initializeLocalWork(jc); mo.initialize(jc, null); + opHooks = OperatorHookUtils.getOperatorHooks(jc); if (localWork == null) { return; @@ -130,6 +133,7 @@ public void map(Object key, Object value, OutputCollector output, rp = reporter; mo.setOutputCollector(oc); mo.setReporter(rp); + mo.setOperatorHooks(opHooks); } // reset the execContext for each new row execContext.resetRow(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java index 18a9bd2..e63b5ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +66,7 @@ private long nextCntr = 1; private static String[] fieldNames; + private List opHooks; public static final Log l4j = LogFactory.getLog("ExecReducer"); private boolean isLogInfoEnabled = false; @@ -148,6 +150,7 @@ public void configure(JobConf job) { try { l4j.info(reducer.dump(0)); reducer.initialize(jc, rowObjectInspector); + opHooks = OperatorHookUtils.getOperatorHooks(jc); } catch (Throwable e) { abort = true; if (e instanceof OutOfMemoryError) { @@ -178,6 +181,7 @@ public void reduce(Object key, Iterator values, OutputCollector output, rp = reporter; reducer.setOutputCollector(oc); reducer.setReporter(rp); + reducer.setOperatorHooks(opHooks); } try { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 68302f8..7779885 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -53,6 +53,7 @@ // Bean methods private static final long serialVersionUID = 1L; + List operatorHooks; protected List> childOperators; protected List> parentOperators; @@ -226,6 +227,17 @@ public String getIdentifier() { return id; } + public void setOperatorHooks(List opHooks){ + operatorHooks = opHooks; + if (childOperators == null) { + return; + } + + for (Operator op : childOperators) { + op.setOperatorHooks(opHooks); + } + } + public void setReporter(Reporter rep) { reporter = rep; @@ -409,6 +421,24 @@ public void passExecContext(ExecMapperContext execContext) { } } + private void enterOperatorHooks(OperatorHookContext opHookContext) throws HiveException { + if (this.operatorHooks == null) { + return; + } + for(OperatorHook opHook : this.operatorHooks) { + opHook.enter(opHookContext); + } + } + + private void exitOperatorHooks(OperatorHookContext opHookContext) throws HiveException { + if (this.operatorHooks == null) { + return; + } + for(OperatorHook opHook : this.operatorHooks) { + opHook.exit(opHookContext); + } + } + /** * Collects all the parent's output object inspectors and calls actual * initialization method. @@ -470,8 +500,12 @@ public void process(Object row, int tag) throws HiveException { if (fatalError) { return; } + OperatorHookContext opHookContext = new OperatorHookContext(getName(), + getIdentifier(), row, tag); preProcessCounter(); + enterOperatorHooks(opHookContext); processOp(row, tag); + exitOperatorHooks(opHookContext); postProcessCounter(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java new file mode 100644 index 0000000..bc98b10 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java @@ -0,0 +1,12 @@ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +/** + * OperatorHook is a new interface which + * executes pre/post each Operator execution in a map/reduce stage + */ + +public interface OperatorHook { + public void enter(OperatorHookContext opHookContext) throws HiveException; + public void exit(OperatorHookContext opHookContext) throws HiveException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java new file mode 100644 index 0000000..31543bb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java @@ -0,0 +1,38 @@ +package org.apache.hadoop.hive.ql.exec; + +public class OperatorHookContext { + private String operatorName; + private String operatorId; + private Object currentRow; + private int parentId; + public OperatorHookContext(String opName, String opId, Object row, int tag) { + operatorName = opName; + operatorId = opId; + currentRow = row; + parentId = tag; + } + + public String getOperatorName() { + return operatorName; + } + + public String getOperatorId() { + return operatorId; + } + + public Object getCurrentRow() { + return currentRow; + } + + public int getParentId() { + return parentId; + } + + @Override + public String toString() { + return "operatorName= " + this.getOperatorName() + + ", id=" + this.getOperatorId() + + ", parentId=" + String.valueOf(this.getParentId()); + + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java new file mode 100644 index 0000000..11394cf --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java @@ -0,0 +1,34 @@ + +package org.apache.hadoop.hive.ql.exec; +import java.util.List; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +public class OperatorHookUtils { + + public static final Log LOG = LogFactory.getLog("OperatorHookUtils"); + public static List getOperatorHooks(Configuration hconf) + throws Exception { + List opHooks = new ArrayList(); + String csOpHooks = HiveConf.getVar(hconf, HiveConf.ConfVars.OPERATORHOOKS); + if(csOpHooks == null) { + return opHooks; + } + String[] opHookClasses = csOpHooks.split(","); + for(String opHookClass: opHookClasses) { + try { + OperatorHook opHook = + (OperatorHook) Class.forName(opHookClass.trim(), true, + JavaUtils.getClassLoader()).newInstance(); + opHooks.add(opHook); + } catch (ClassNotFoundException e) { + LOG.error(opHookClass + " Class not found: " + e.getMessage()); + } + } + return opHooks; + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java new file mode 100644 index 0000000..3b4ce59 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java @@ -0,0 +1,12 @@ +package org.apache.hadoop.hive.ql.exec; +public class TestOperatorHook implements OperatorHook { + public int numEnters = 0; + public int numExits = 0; + public void enter(OperatorHookContext opHookContext) { + numEnters += 1; + } + public void exit(OperatorHookContext opHookContext) { + numExits += 1; + } +} + diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index aae0b15..6fc29b2 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hive.ql.exec; +import java.util.List; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.lang.Class; import junit.framework.TestCase; @@ -140,6 +142,89 @@ public void testBaseFilterOperator() throws Throwable { } } + public void testOperatorHook() throws Throwable { + try { + System.out.println("Testing Map Operator"); + // initialize configuration + Configuration hconf = new JobConf(TestOperators.class); + HiveConf.setVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME, + "hdfs:///testDir/testFile"); + + HiveConf.setVar(hconf, HiveConf.ConfVars.OPERATORHOOKS, + "org.apache.hadoop.hive.ql.exec.TestOperatorHook"); + // initialize pathToAliases + ArrayList aliases = new ArrayList(); + aliases.add("a"); + aliases.add("b"); + LinkedHashMap> pathToAliases = + new LinkedHashMap>(); + pathToAliases.put("/testDir", aliases); + + // initialize pathToTableInfo + // Default: treat the table as a single column "col" + TableDesc td = Utilities.defaultTd; + PartitionDesc pd = new PartitionDesc(td, null); + LinkedHashMap pathToPartitionInfo = + new LinkedHashMap(); + pathToPartitionInfo.put("/testDir", pd); + + // initialize aliasToWork + CollectDesc cd = new CollectDesc(Integer.valueOf(1)); + CollectOperator cdop1 = (CollectOperator) OperatorFactory + .get(CollectDesc.class); + cdop1.setConf(cd); + CollectOperator cdop2 = (CollectOperator) OperatorFactory + .get(CollectDesc.class); + cdop2.setConf(cd); + LinkedHashMap> aliasToWork = + new LinkedHashMap>(); + aliasToWork.put("a", cdop1); + aliasToWork.put("b", cdop2); + + // initialize mapredWork + MapredWork mrwork = new MapredWork(); + mrwork.setPathToAliases(pathToAliases); + mrwork.setPathToPartitionInfo(pathToPartitionInfo); + mrwork.setAliasToWork(aliasToWork); + + // get map operator and initialize it + MapOperator mo = new MapOperator(); + mo.initializeAsRoot(hconf, mrwork); + List opHooks = OperatorHookUtils.getOperatorHooks(hconf); + mo.setOperatorHooks(opHooks); + Text tw = new Text(); + InspectableObject io1 = new InspectableObject(); + InspectableObject io2 = new InspectableObject(); + for (int i = 0; i < 5; i++) { + String answer = "[[" + i + ", " + (i + 1) + ", " + (i + 2) + "]]"; + + tw.set("" + i + "\u0001" + (i + 1) + "\u0001" + (i + 2)); + mo.process(tw); + cdop1.retrieve(io1); + cdop2.retrieve(io2); + System.out.println("io1.o.toString() = " + io1.o.toString()); + System.out.println("io2.o.toString() = " + io2.o.toString()); + System.out.println("answer.toString() = " + answer.toString()); + assertEquals(answer.toString(), io1.o.toString()); + assertEquals(answer.toString(), io2.o.toString()); + for(OperatorHook opHook : opHooks) { + if (TestOperatorHook.class.isInstance(opHook)) { + TestOperatorHook hook = (TestOperatorHook) opHook; + System.out.println(hook.numEnters); + System.out.println(hook.numEnters); + assertEquals(hook.numEnters, hook.numExits); + } + + } + } + + System.out.println("Map Operator ok"); + + } catch (Throwable e) { + e.printStackTrace(); + throw (e); + } + } private void testTaskIds(String [] taskIds, String expectedAttemptId, String expectedTaskId) { Configuration conf = new JobConf(TestOperators.class); for (String one: taskIds) { @@ -352,7 +437,6 @@ public void testMapOperator() throws Throwable { // get map operator and initialize it MapOperator mo = new MapOperator(); mo.initializeAsRoot(hconf, mrwork); - Text tw = new Text(); InspectableObject io1 = new InspectableObject(); InspectableObject io2 = new InspectableObject(); @@ -377,4 +461,5 @@ public void testMapOperator() throws Throwable { throw (e); } } + }