Index: shims/build.xml =================================================================== --- shims/build.xml (revision 1438471) +++ shims/build.xml (working copy) @@ -96,7 +96,7 @@ encoding="${build.encoding}" srcdir="${worker.test.src.dir}" includes="org/apache/hadoop/**/*.java" - excludes="**/TestSerDe.java" + excludes="**/TestSerDe.java,**/TestOperatorHook.java,**/TestOperatorHookUtils.java" destdir="${test.build.classes}" debug="${javac.debug}" optimize="${javac.optimize}" Index: build.xml =================================================================== --- build.xml (revision 1438471) +++ build.xml (working copy) @@ -487,7 +487,7 @@ - + Index: build-common.xml =================================================================== --- build-common.xml (revision 1438471) +++ build-common.xml (working copy) @@ -57,7 +57,7 @@ - + @@ -281,7 +281,7 @@ encoding="${build.encoding}" srcdir="${test.src.dir}" includes="org/apache/hadoop/**/*.java" - excludes="**/TestSerDe.java" + excludes="**/TestSerDe.java,**/TestOperatorHook.java,**/TestOperatorHookUtils.java" destdir="${test.build.classes}" debug="${javac.debug}" optimize="${javac.optimize}" @@ -444,7 +444,7 @@ + excludes="**/TestSerDe.class,**/TestOperatorHook.class,**/TestOperatorHookUtils.class,**/TestHiveMetaStore.class,**/*$*.class,${test.junit.exclude}" /> Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1438471) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -178,6 +178,7 @@ PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), ONFAILUREHOOKS("hive.exec.failure.hooks", ""), + OPERATORHOOKS("hive.exec.operator.hooks", ""), CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""), EXECPARALLEL("hive.exec.parallel", false), // parallel query launching EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8), Index: ql/src/test/results/clientpositive/operatorhook.q.out =================================================================== --- ql/src/test/results/clientpositive/operatorhook.q.out (revision 0) +++ ql/src/test/results/clientpositive/operatorhook.q.out (working copy) @@ -0,0 +1,7 @@ +PREHOOK: query: SELECT count(1) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +TEST_OPERATOR_HOOK_EXIT: 7508 +TEST_OPERATOR_HOOK_ENTER: 7508 +500 Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java (working copy) @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import java.util.Collection; +import java.util.List; +import java.util.Iterator; +import java.io.Serializable; +import java.io.IOException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapred.Counters; + +public class PostTestOperatorHook implements ExecuteWithHookContext { + private void logEnterExitCounters(Task task) throws IOException { + if(task.getTaskHandle() != null) { + Counters counters = task.getTaskHandle().getCounters(); + if(counters != null) { + logCounterValue(counters, "TEST_OPERATOR_HOOK_"); + } else { + SessionState.getConsole().printError("counters are null"); + } + } else { + SessionState.getConsole().printError("task handle is null"); + } + } + + private void logCounterValue(Counters ctr, String name) { + Collection counterGroups = ctr.getGroupNames(); + for (String groupName : counterGroups) { + Counters.Group group = ctr.getGroup(groupName); + Iterator it = group.iterator(); + while (it.hasNext()) { + Counters.Counter counter = it.next(); + if(counter.getName().contains(name)) { + SessionState.getConsole().printError(counter.getName() + ": " + counter.getValue()); + } + } + } + } + + public void run(HookContext hookContext) { + HiveConf conf = hookContext.getConf(); + List completedTasks = hookContext.getCompleteTaskList(); + if (completedTasks != null) { + for (TaskRunner taskRunner : completedTasks) { + Task task = taskRunner.getTask(); + if (task.isMapRedTask() && !task.isMapRedLocalTask()) { + try { + logEnterExitCounters(task); + + } catch (Exception e) { + SessionState.getConsole().printError("Error in get counters: " + e.toString()); + } + } + } + } + } +} Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java (working copy) @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; +import java.util.HashMap; + +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class TestOperatorHook implements OperatorHook { + protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); + private long enters=0; + private long exits=0; + public void enter(OperatorHookContext opHookContext) { + enters++; + } + + public void exit(OperatorHookContext opHookContext) { + exits++; + } + + public void close(OperatorHookContext opHookContext) { + incrCounter(TestOperatorHookUtils.TEST_OPERATOR_HOOK_ENTER, opHookContext, enters); + incrCounter(TestOperatorHookUtils.TEST_OPERATOR_HOOK_EXIT, opHookContext, exits); + } + + private void incrCounter(String ctrName, OperatorHookContext opHookContext, long incrVal) { + TestOperatorHookUtils.TestOperatorHookCounter ctr = + TestOperatorHookUtils.TestOperatorHookCounter.valueOf(ctrName); + Operator op = opHookContext.getOperator(); + LOG.info(ctrName); + op.reporter.incrCounter(ctr, incrVal); + Long val = op.reporter.getCounter(ctr).getValue(); + LOG.info(ctrName + " " + String.valueOf(val)); + } +} + Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java (working copy) @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.Collection; +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; + +public class TestOperatorHookUtils { + public static final String TEST_OPERATOR_HOOK_ENTER = "TEST_OPERATOR_HOOK_ENTER"; + public static final String TEST_OPERATOR_HOOK_EXIT = "TEST_OPERATOR_HOOK_EXIT"; + + public static enum TestOperatorHookCounter { + TEST_OPERATOR_HOOK_ENTER, + TEST_OPERATOR_HOOK_EXIT, + } +} Index: ql/src/test/queries/clientpositive/operatorhook.q =================================================================== --- ql/src/test/queries/clientpositive/operatorhook.q (revision 0) +++ ql/src/test/queries/clientpositive/operatorhook.q (working copy) @@ -0,0 +1,6 @@ +SET hive.exec.operator.hooks=org.apache.hadoop.hive.ql.exec.TestOperatorHook; +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostTestOperatorHook; +SET hive.exec.mode.local.auto=false; +SET hive.task.progress=true; + +SELECT count(1) FROM src Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (revision 1438471) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (working copy) @@ -83,7 +83,6 @@ protected String id; protected T work; - public static enum FeedType { DYNAMIC_PARTITIONS, // list of dynamic partitions }; @@ -102,6 +101,10 @@ taskTag = Task.NO_TAG; } + public TaskHandle getTaskHandle() { + return taskHandle; + } + public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { this.queryPlan = queryPlan; isdone = false; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (revision 1438471) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (working copy) @@ -50,6 +50,7 @@ private JobConf jc; private boolean abort = false; private Reporter rp; + private List opHooks; public static final Log l4j = LogFactory.getLog("ExecMapper"); private static boolean done; @@ -96,6 +97,7 @@ mo.setExecContext(execContext); mo.initializeLocalWork(jc); mo.initialize(jc, null); + opHooks = OperatorHookUtils.getOperatorHooks(jc); if (localWork == null) { return; @@ -128,6 +130,7 @@ rp = reporter; mo.setOutputCollector(oc); mo.setReporter(rp); + mo.setOperatorHooks(opHooks); } // reset the execContext for each new row execContext.resetRow(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java (working copy) @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; +import java.util.List; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +public class OperatorHookUtils { + + public static final Log LOG = LogFactory.getLog("OperatorHookUtils"); + public static List getOperatorHooks(Configuration hconf) + throws Exception { + List opHooks = new ArrayList(); + String csOpHooks = HiveConf.getVar(hconf, HiveConf.ConfVars.OPERATORHOOKS); + if(csOpHooks == null) { + return opHooks; + } + String[] opHookClasses = csOpHooks.split(","); + for(String opHookClass: opHookClasses) { + try { + OperatorHook opHook = + (OperatorHook) Class.forName(opHookClass.trim(), true, + JavaUtils.getClassLoader()).newInstance(); + opHooks.add(opHook); + } catch (ClassNotFoundException e) { + LOG.error(opHookClass + " Class not found: " + e.getMessage()); + } + } + return opHooks; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (revision 1438471) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (working copy) @@ -531,6 +531,7 @@ jobInfo(rj); MapRedStats mapRedStats = progress(th); + this.task.taskHandle = th; // Not always there is a SessionState. Sometimes ExeDriver is directly invoked // for special modes. In that case, SessionState.get() is empty. if (SessionState.get() != null) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java (working copy) @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +public class OperatorHookContext { + private String operatorName; + private String operatorId; + private Object currentRow; + private Operator operator; + public OperatorHookContext(Operator op, Object row) { + this(op.getName(), op.getIdentifier(), row); + this.operator = op; + } + + private OperatorHookContext(String opName, String opId, Object row) { + operatorName = opName; + operatorId = opId; + currentRow = row; + } + + public Operator getOperator() { + return operator; + } + + public String getOperatorName() { + return operatorName; + } + + public String getOperatorId() { + return operatorId; + } + + public Object getCurrentRow() { + return currentRow; + } + + @Override + public String toString() { + return "operatorName= " + this.getOperatorName() + + ", id=" + this.getOperatorId(); + + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java (working copy) @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +/** + * OperatorHook is a new interface which + * executes pre/post each Operator execution in a map/reduce stage + */ + +public interface OperatorHook { + public void enter(OperatorHookContext opHookContext) throws HiveException; + public void exit(OperatorHookContext opHookContext) throws HiveException; + public void close(OperatorHookContext opHookContext) throws HiveException; +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 1438471) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -53,7 +53,9 @@ // Bean methods private static final long serialVersionUID = 1L; + List operatorHooks; + private Configuration configuration; protected List> childOperators; protected List> parentOperators; protected String operatorId; @@ -129,6 +131,9 @@ this.childOperators = childOperators; } + public Configuration getConfiguration() { + return configuration; + } public List> getChildOperators() { return childOperators; } @@ -226,6 +231,17 @@ return id; } + public void setOperatorHooks(List opHooks){ + operatorHooks = opHooks; + if (childOperators == null) { + return; + } + + for (Operator op : childOperators) { + op.setOperatorHooks(opHooks); + } + } + public void setReporter(Reporter rep) { reporter = rep; @@ -313,6 +329,7 @@ return; } + this.configuration = hconf; this.out = null; if (!areAllParentsInitialized()) { return; @@ -409,6 +426,34 @@ } } + private void enterOperatorHooks(OperatorHookContext opHookContext) throws HiveException { + if (this.operatorHooks == null) { + return; + } + for(OperatorHook opHook : this.operatorHooks) { + opHook.enter(opHookContext); + } + } + + private void exitOperatorHooks(OperatorHookContext opHookContext) throws HiveException { + if (this.operatorHooks == null) { + return; + } + for(OperatorHook opHook : this.operatorHooks) { + opHook.exit(opHookContext); + } + } + + private void closeOperatorHooks(OperatorHookContext opHookContext) throws HiveException { + if (this.operatorHooks == null) { + return; + } + for(OperatorHook opHook : this.operatorHooks) { + opHook.close(opHookContext); + } + } + + /** * Collects all the parent's output object inspectors and calls actual * initialization method. @@ -470,8 +515,11 @@ if (fatalError) { return; } + OperatorHookContext opHookContext = new OperatorHookContext(this, row); preProcessCounter(); + enterOperatorHooks(opHookContext); processOp(row, tag); + exitOperatorHooks(opHookContext); postProcessCounter(); } @@ -554,6 +602,7 @@ LOG.info(id + " forwarded " + cntr + " rows"); + closeOperatorHooks(new OperatorHookContext(this, null)); // call the operator specific close routine closeOp(abort); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (revision 1438471) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (working copy) @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +66,7 @@ private long nextCntr = 1; private static String[] fieldNames; + private List opHooks; public static final Log l4j = LogFactory.getLog("ExecReducer"); private boolean isLogInfoEnabled = false; @@ -148,6 +150,7 @@ try { l4j.info(reducer.dump(0)); reducer.initialize(jc, rowObjectInspector); + opHooks = OperatorHookUtils.getOperatorHooks(jc); } catch (Throwable e) { abort = true; if (e instanceof OutOfMemoryError) { @@ -178,6 +181,7 @@ rp = reporter; reducer.setOutputCollector(oc); reducer.setReporter(rp); + reducer.setOperatorHooks(opHooks); } try {