diff --git build-common.xml build-common.xml
index b88ddbc..ce3a9de 100644
--- build-common.xml
+++ build-common.xml
@@ -57,7 +57,7 @@
-
+
diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 15fce1e..a42e9f1 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -178,6 +178,7 @@
PREEXECHOOKS("hive.exec.pre.hooks", ""),
POSTEXECHOOKS("hive.exec.post.hooks", ""),
ONFAILUREHOOKS("hive.exec.failure.hooks", ""),
+ OPERATORHOOKS("hive.exec.operator.hooks", ""),
CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""),
EXECPARALLEL("hive.exec.parallel", false), // parallel query launching
EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8),
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
index 9a8c237..8b2866f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
@@ -50,6 +50,7 @@
private JobConf jc;
private boolean abort = false;
private Reporter rp;
+ private List opHooks;
public static final Log l4j = LogFactory.getLog("ExecMapper");
private static boolean done;
@@ -96,6 +97,7 @@ public void configure(JobConf job) {
mo.setExecContext(execContext);
mo.initializeLocalWork(jc);
mo.initialize(jc, null);
+ opHooks = OperatorHookUtils.getOperatorHooks(jc);
if (localWork == null) {
return;
@@ -130,6 +132,7 @@ public void map(Object key, Object value, OutputCollector output,
rp = reporter;
mo.setOutputCollector(oc);
mo.setReporter(rp);
+ mo.setOperatorHooks(opHooks);
}
// reset the execContext for each new row
execContext.resetRow();
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
index 18a9bd2..e63b5ce 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,6 +66,7 @@
private long nextCntr = 1;
private static String[] fieldNames;
+ private List opHooks;
public static final Log l4j = LogFactory.getLog("ExecReducer");
private boolean isLogInfoEnabled = false;
@@ -148,6 +150,7 @@ public void configure(JobConf job) {
try {
l4j.info(reducer.dump(0));
reducer.initialize(jc, rowObjectInspector);
+ opHooks = OperatorHookUtils.getOperatorHooks(jc);
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
@@ -178,6 +181,7 @@ public void reduce(Object key, Iterator values, OutputCollector output,
rp = reporter;
reducer.setOutputCollector(oc);
reducer.setReporter(rp);
+ reducer.setOperatorHooks(opHooks);
}
try {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
index 83cf653..effc540 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
@@ -531,6 +531,7 @@ public int progress(RunningJob rj, JobClient jc) throws IOException {
jobInfo(rj);
MapRedStats mapRedStats = progress(th);
+ this.task.taskHandle = th;
// Not always there is a SessionState. Sometimes ExeDriver is directly invoked
// for special modes. In that case, SessionState.get() is empty.
if (SessionState.get() != null) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 68302f8..84d6fc1 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -53,7 +53,9 @@
// Bean methods
private static final long serialVersionUID = 1L;
+ List operatorHooks;
+ private Configuration configuration;
protected List> childOperators;
protected List> parentOperators;
protected String operatorId;
@@ -129,6 +131,9 @@ public void setChildOperators(
this.childOperators = childOperators;
}
+ public Configuration getConfiguration() {
+ return configuration;
+ }
public List> getChildOperators() {
return childOperators;
}
@@ -226,6 +231,17 @@ public String getIdentifier() {
return id;
}
+ public void setOperatorHooks(List opHooks){
+ operatorHooks = opHooks;
+ if (childOperators == null) {
+ return;
+ }
+
+ for (Operator extends OperatorDesc> op : childOperators) {
+ op.setOperatorHooks(opHooks);
+ }
+ }
+
public void setReporter(Reporter rep) {
reporter = rep;
@@ -313,6 +329,7 @@ public void initialize(Configuration hconf, ObjectInspector[] inputOIs)
return;
}
+ this.configuration = hconf;
this.out = null;
if (!areAllParentsInitialized()) {
return;
@@ -409,6 +426,24 @@ public void passExecContext(ExecMapperContext execContext) {
}
}
+ private void enterOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
+ if (this.operatorHooks == null) {
+ return;
+ }
+ for(OperatorHook opHook : this.operatorHooks) {
+ opHook.enter(opHookContext);
+ }
+ }
+
+ private void exitOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
+ if (this.operatorHooks == null) {
+ return;
+ }
+ for(OperatorHook opHook : this.operatorHooks) {
+ opHook.exit(opHookContext);
+ }
+ }
+
/**
* Collects all the parent's output object inspectors and calls actual
* initialization method.
@@ -470,8 +505,11 @@ public void process(Object row, int tag) throws HiveException {
if (fatalError) {
return;
}
+ OperatorHookContext opHookContext = new OperatorHookContext(this, row);
preProcessCounter();
+ enterOperatorHooks(opHookContext);
processOp(row, tag);
+ exitOperatorHooks(opHookContext);
postProcessCounter();
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java
new file mode 100644
index 0000000..54b325c
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+/**
+ * OperatorHook is a new interface which
+ * executes pre/post each Operator execution in a map/reduce stage
+ */
+
+public interface OperatorHook {
+ public void enter(OperatorHookContext opHookContext) throws HiveException;
+ public void exit(OperatorHookContext opHookContext) throws HiveException;
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java
new file mode 100644
index 0000000..bb40f29
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+public class OperatorHookContext {
+ private String operatorName;
+ private String operatorId;
+ private Object currentRow;
+ private Operator operator;
+ public OperatorHookContext(Operator op, Object row) {
+ this(op.getName(), op.getIdentifier(), row);
+ this.operator = op;
+ }
+
+ private OperatorHookContext(String opName, String opId, Object row) {
+ operatorName = opName;
+ operatorId = opId;
+ currentRow = row;
+ }
+
+ public Operator getOperator() {
+ return operator;
+ }
+
+ public String getOperatorName() {
+ return operatorName;
+ }
+
+ public String getOperatorId() {
+ return operatorId;
+ }
+
+ public Object getCurrentRow() {
+ return currentRow;
+ }
+
+ @Override
+ public String toString() {
+ return "operatorName= " + this.getOperatorName() +
+ ", id=" + this.getOperatorId();
+
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java
new file mode 100644
index 0000000..d7be357
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+public class OperatorHookUtils {
+
+ public static final Log LOG = LogFactory.getLog("OperatorHookUtils");
+ public static List getOperatorHooks(Configuration hconf)
+ throws Exception {
+ List opHooks = new ArrayList();
+ String csOpHooks = HiveConf.getVar(hconf, HiveConf.ConfVars.OPERATORHOOKS);
+ if(csOpHooks == null) {
+ return opHooks;
+ }
+ String[] opHookClasses = csOpHooks.split(",");
+ for(String opHookClass: opHookClasses) {
+ try {
+ OperatorHook opHook =
+ (OperatorHook) Class.forName(opHookClass.trim(), true,
+ JavaUtils.getClassLoader()).newInstance();
+ opHooks.add(opHook);
+ } catch (ClassNotFoundException e) {
+ LOG.error(opHookClass + " Class not found: " + e.getMessage());
+ }
+ }
+ return opHooks;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index b380a69..9409c2a 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -83,7 +83,6 @@
protected String id;
protected T work;
-
public static enum FeedType {
DYNAMIC_PARTITIONS, // list of dynamic partitions
};
@@ -102,6 +101,10 @@ public Task() {
taskTag = Task.NO_TAG;
}
+ public TaskHandle getTaskHandle() {
+ return taskHandle;
+ }
+
public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
this.queryPlan = queryPlan;
isdone = false;
diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java
new file mode 100644
index 0000000..6414693
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+import java.util.HashMap;
+
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+public class TestOperatorHook implements OperatorHook {
+ protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
+
+ public void enter(OperatorHookContext opHookContext) {
+ incrCounter(TestOperatorHookUtils.TEST_OPERATOR_HOOK_ENTER, opHookContext);
+ }
+
+ public void exit(OperatorHookContext opHookContext) {
+ incrCounter(TestOperatorHookUtils.TEST_OPERATOR_HOOK_EXIT, opHookContext);
+ }
+
+ private void incrCounter(String ctrName, OperatorHookContext opHookContext) {
+ TestOperatorHookUtils.TestOperatorHookCounter ctr =
+ TestOperatorHookUtils.TestOperatorHookCounter.valueOf(ctrName);
+ Operator op = opHookContext.getOperator();
+ LOG.info(ctrName);
+ op.reporter.incrCounter(ctr, 1);
+ Long val = op.reporter.getCounter(ctr).getValue();
+ LOG.info(ctrName + " " + String.valueOf(val));
+ }
+}
+
diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java
new file mode 100644
index 0000000..1218ded
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.Collection;
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+public class TestOperatorHookUtils {
+ public static final String TEST_OPERATOR_HOOK_ENTER = "TEST_OPERATOR_HOOK_ENTER";
+ public static final String TEST_OPERATOR_HOOK_EXIT = "TEST_OPERATOR_HOOK_EXIT";
+
+ public static enum TestOperatorHookCounter {
+ TEST_OPERATOR_HOOK_ENTER,
+ TEST_OPERATOR_HOOK_EXIT,
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java
new file mode 100644
index 0000000..b931209
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Iterator;
+import java.io.Serializable;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapred.Counters;
+
+public class PostTestOperatorHook implements ExecuteWithHookContext {
+ private void logEnterExitCounters(Task extends Serializable> task) throws IOException {
+ if(task.getTaskHandle() != null) {
+ Counters counters = task.getTaskHandle().getCounters();
+ if(counters != null) {
+ logCounterValue(counters, "TEST_OPERATOR_HOOK_");
+ } else {
+ SessionState.getConsole().printError("counters are null");
+ }
+ } else {
+ SessionState.getConsole().printError("task handle is null");
+ }
+ }
+
+ private void logCounterValue(Counters ctr, String name) {
+ Collection counterGroups = ctr.getGroupNames();
+ for (String groupName : counterGroups) {
+ Counters.Group group = ctr.getGroup(groupName);
+ Iterator it = group.iterator();
+ while (it.hasNext()) {
+ Counters.Counter counter = it.next();
+ if(counter.getName().contains(name)) {
+ SessionState.getConsole().printError(counter.getName() + ": " + counter.getValue());
+ }
+ }
+ }
+ }
+
+ public void run(HookContext hookContext) {
+ HiveConf conf = hookContext.getConf();
+ List completedTasks = hookContext.getCompleteTaskList();
+ if (completedTasks != null) {
+ for (TaskRunner taskRunner : completedTasks) {
+ Task extends Serializable> task = taskRunner.getTask();
+ if (task.isMapRedTask() && !task.isMapRedLocalTask()) {
+ try {
+ logEnterExitCounters(task);
+
+ } catch (Exception e) {
+ SessionState.getConsole().printError("Error in get counters: " + e.toString());
+ }
+ }
+ }
+ }
+ }
+}
diff --git ql/src/test/queries/clientpositive/operatorhook.q ql/src/test/queries/clientpositive/operatorhook.q
new file mode 100644
index 0000000..0f2cdd8
--- /dev/null
+++ ql/src/test/queries/clientpositive/operatorhook.q
@@ -0,0 +1,6 @@
+SET hive.exec.operator.hooks=org.apache.hadoop.hive.ql.exec.TestOperatorHook;
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostTestOperatorHook;
+SET hive.exec.mode.local.auto=false;
+SET hive.task.progress=true;
+
+SELECT count(1) FROM src
diff --git ql/src/test/results/clientpositive/operatorhook.q.out ql/src/test/results/clientpositive/operatorhook.q.out
new file mode 100644
index 0000000..1fdf66b
--- /dev/null
+++ ql/src/test/results/clientpositive/operatorhook.q.out
@@ -0,0 +1,7 @@
+PREHOOK: query: SELECT count(1) FROM src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+TEST_OPERATOR_HOOK_EXIT: 1504
+TEST_OPERATOR_HOOK_ENTER: 1504
+500