diff --git build-common.xml build-common.xml index f1a409c..bbaa070 100644 --- build-common.xml +++ build-common.xml @@ -57,7 +57,7 @@ - + diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 15fce1e..a42e9f1 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -178,6 +178,7 @@ PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), ONFAILUREHOOKS("hive.exec.failure.hooks", ""), + OPERATORHOOKS("hive.exec.operator.hooks", ""), CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""), EXECPARALLEL("hive.exec.parallel", false), // parallel query launching EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java index 9a8c237..8b2866f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java @@ -50,6 +50,7 @@ private JobConf jc; private boolean abort = false; private Reporter rp; + private List opHooks; public static final Log l4j = LogFactory.getLog("ExecMapper"); private static boolean done; @@ -96,6 +97,7 @@ public void configure(JobConf job) { mo.setExecContext(execContext); mo.initializeLocalWork(jc); mo.initialize(jc, null); + opHooks = OperatorHookUtils.getOperatorHooks(jc); if (localWork == null) { return; @@ -130,6 +132,7 @@ public void map(Object key, Object value, OutputCollector output, rp = reporter; mo.setOutputCollector(oc); mo.setReporter(rp); + mo.setOperatorHooks(opHooks); } // reset the execContext for each new row execContext.resetRow(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java index 18a9bd2..e63b5ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +66,7 @@ private long nextCntr = 1; private static String[] fieldNames; + private List opHooks; public static final Log l4j = LogFactory.getLog("ExecReducer"); private boolean isLogInfoEnabled = false; @@ -148,6 +150,7 @@ public void configure(JobConf job) { try { l4j.info(reducer.dump(0)); reducer.initialize(jc, rowObjectInspector); + opHooks = OperatorHookUtils.getOperatorHooks(jc); } catch (Throwable e) { abort = true; if (e instanceof OutOfMemoryError) { @@ -178,6 +181,7 @@ public void reduce(Object key, Iterator values, OutputCollector output, rp = reporter; reducer.setOutputCollector(oc); reducer.setReporter(rp); + reducer.setOperatorHooks(opHooks); } try { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java index 83cf653..effc540 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java @@ -531,6 +531,7 @@ public int progress(RunningJob rj, JobClient jc) throws IOException { jobInfo(rj); MapRedStats mapRedStats = progress(th); + this.task.taskHandle = th; // Not always there is a SessionState. Sometimes ExeDriver is directly invoked // for special modes. In that case, SessionState.get() is empty. if (SessionState.get() != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 68302f8..84d6fc1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -53,7 +53,9 @@ // Bean methods private static final long serialVersionUID = 1L; + List operatorHooks; + private Configuration configuration; protected List> childOperators; protected List> parentOperators; protected String operatorId; @@ -129,6 +131,9 @@ public void setChildOperators( this.childOperators = childOperators; } + public Configuration getConfiguration() { + return configuration; + } public List> getChildOperators() { return childOperators; } @@ -226,6 +231,17 @@ public String getIdentifier() { return id; } + public void setOperatorHooks(List opHooks){ + operatorHooks = opHooks; + if (childOperators == null) { + return; + } + + for (Operator op : childOperators) { + op.setOperatorHooks(opHooks); + } + } + public void setReporter(Reporter rep) { reporter = rep; @@ -313,6 +329,7 @@ public void initialize(Configuration hconf, ObjectInspector[] inputOIs) return; } + this.configuration = hconf; this.out = null; if (!areAllParentsInitialized()) { return; @@ -409,6 +426,24 @@ public void passExecContext(ExecMapperContext execContext) { } } + private void enterOperatorHooks(OperatorHookContext opHookContext) throws HiveException { + if (this.operatorHooks == null) { + return; + } + for(OperatorHook opHook : this.operatorHooks) { + opHook.enter(opHookContext); + } + } + + private void exitOperatorHooks(OperatorHookContext opHookContext) throws HiveException { + if (this.operatorHooks == null) { + return; + } + for(OperatorHook opHook : this.operatorHooks) { + opHook.exit(opHookContext); + } + } + /** * Collects all the parent's output object inspectors and calls actual * initialization method. @@ -470,8 +505,11 @@ public void process(Object row, int tag) throws HiveException { if (fatalError) { return; } + OperatorHookContext opHookContext = new OperatorHookContext(this, row); preProcessCounter(); + enterOperatorHooks(opHookContext); processOp(row, tag); + exitOperatorHooks(opHookContext); postProcessCounter(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java new file mode 100644 index 0000000..54b325c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +/** + * OperatorHook is a new interface which + * executes pre/post each Operator execution in a map/reduce stage + */ + +public interface OperatorHook { + public void enter(OperatorHookContext opHookContext) throws HiveException; + public void exit(OperatorHookContext opHookContext) throws HiveException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java new file mode 100644 index 0000000..bb40f29 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +public class OperatorHookContext { + private String operatorName; + private String operatorId; + private Object currentRow; + private Operator operator; + public OperatorHookContext(Operator op, Object row) { + this(op.getName(), op.getIdentifier(), row); + this.operator = op; + } + + private OperatorHookContext(String opName, String opId, Object row) { + operatorName = opName; + operatorId = opId; + currentRow = row; + } + + public Operator getOperator() { + return operator; + } + + public String getOperatorName() { + return operatorName; + } + + public String getOperatorId() { + return operatorId; + } + + public Object getCurrentRow() { + return currentRow; + } + + @Override + public String toString() { + return "operatorName= " + this.getOperatorName() + + ", id=" + this.getOperatorId(); + + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java new file mode 100644 index 0000000..d7be357 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; +import java.util.List; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +public class OperatorHookUtils { + + public static final Log LOG = LogFactory.getLog("OperatorHookUtils"); + public static List getOperatorHooks(Configuration hconf) + throws Exception { + List opHooks = new ArrayList(); + String csOpHooks = HiveConf.getVar(hconf, HiveConf.ConfVars.OPERATORHOOKS); + if(csOpHooks == null) { + return opHooks; + } + String[] opHookClasses = csOpHooks.split(","); + for(String opHookClass: opHookClasses) { + try { + OperatorHook opHook = + (OperatorHook) Class.forName(opHookClass.trim(), true, + JavaUtils.getClassLoader()).newInstance(); + opHooks.add(opHook); + } catch (ClassNotFoundException e) { + LOG.error(opHookClass + " Class not found: " + e.getMessage()); + } + } + return opHooks; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index b380a69..9409c2a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -83,7 +83,6 @@ protected String id; protected T work; - public static enum FeedType { DYNAMIC_PARTITIONS, // list of dynamic partitions }; @@ -102,6 +101,10 @@ public Task() { taskTag = Task.NO_TAG; } + public TaskHandle getTaskHandle() { + return taskHandle; + } + public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { this.queryPlan = queryPlan; isdone = false; diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java new file mode 100644 index 0000000..6414693 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; +import java.util.HashMap; + +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +public class TestOperatorHook implements OperatorHook { + protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); + + public void enter(OperatorHookContext opHookContext) { + incrCounter(TestOperatorHookUtils.TEST_OPERATOR_HOOK_ENTER, opHookContext); + } + + public void exit(OperatorHookContext opHookContext) { + incrCounter(TestOperatorHookUtils.TEST_OPERATOR_HOOK_EXIT, opHookContext); + } + + private void incrCounter(String ctrName, OperatorHookContext opHookContext) { + TestOperatorHookUtils.TestOperatorHookCounter ctr = + TestOperatorHookUtils.TestOperatorHookCounter.valueOf(ctrName); + Operator op = opHookContext.getOperator(); + LOG.info(ctrName); + op.reporter.incrCounter(ctr, 1); + Long val = op.reporter.getCounter(ctr).getValue(); + LOG.info(ctrName + " " + String.valueOf(val)); + } +} + diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java new file mode 100644 index 0000000..1218ded --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.Collection; +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +public class TestOperatorHookUtils { + public static final String TEST_OPERATOR_HOOK_ENTER = "TEST_OPERATOR_HOOK_ENTER"; + public static final String TEST_OPERATOR_HOOK_EXIT = "TEST_OPERATOR_HOOK_EXIT"; + + public static enum TestOperatorHookCounter { + TEST_OPERATOR_HOOK_ENTER, + TEST_OPERATOR_HOOK_EXIT, + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java new file mode 100644 index 0000000..b931209 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import java.util.Collection; +import java.util.List; +import java.util.Iterator; +import java.io.Serializable; +import java.io.IOException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapred.Counters; + +public class PostTestOperatorHook implements ExecuteWithHookContext { + private void logEnterExitCounters(Task task) throws IOException { + if(task.getTaskHandle() != null) { + Counters counters = task.getTaskHandle().getCounters(); + if(counters != null) { + logCounterValue(counters, "TEST_OPERATOR_HOOK_"); + } else { + SessionState.getConsole().printError("counters are null"); + } + } else { + SessionState.getConsole().printError("task handle is null"); + } + } + + private void logCounterValue(Counters ctr, String name) { + Collection counterGroups = ctr.getGroupNames(); + for (String groupName : counterGroups) { + Counters.Group group = ctr.getGroup(groupName); + Iterator it = group.iterator(); + while (it.hasNext()) { + Counters.Counter counter = it.next(); + if(counter.getName().contains(name)) { + SessionState.getConsole().printError(counter.getName() + ": " + counter.getValue()); + } + } + } + } + + public void run(HookContext hookContext) { + HiveConf conf = hookContext.getConf(); + List completedTasks = hookContext.getCompleteTaskList(); + if (completedTasks != null) { + for (TaskRunner taskRunner : completedTasks) { + Task task = taskRunner.getTask(); + if (task.isMapRedTask() && !task.isMapRedLocalTask()) { + try { + logEnterExitCounters(task); + + } catch (Exception e) { + SessionState.getConsole().printError("Error in get counters: " + e.toString()); + } + } + } + } + } +} diff --git ql/src/test/queries/clientpositive/operatorhook.q ql/src/test/queries/clientpositive/operatorhook.q new file mode 100644 index 0000000..0f2cdd8 --- /dev/null +++ ql/src/test/queries/clientpositive/operatorhook.q @@ -0,0 +1,6 @@ +SET hive.exec.operator.hooks=org.apache.hadoop.hive.ql.exec.TestOperatorHook; +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostTestOperatorHook; +SET hive.exec.mode.local.auto=false; +SET hive.task.progress=true; + +SELECT count(1) FROM src diff --git ql/src/test/results/clientpositive/operatorhook.q.out ql/src/test/results/clientpositive/operatorhook.q.out new file mode 100644 index 0000000..1fdf66b --- /dev/null +++ ql/src/test/results/clientpositive/operatorhook.q.out @@ -0,0 +1,7 @@ +PREHOOK: query: SELECT count(1) FROM src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +TEST_OPERATOR_HOOK_EXIT: 1504 +TEST_OPERATOR_HOOK_ENTER: 1504 +500