diff --git build-common.xml build-common.xml
index b88ddbc..6b7cc0f 100644
--- build-common.xml
+++ build-common.xml
@@ -57,7 +57,7 @@
-
+
diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 15fce1e..a42e9f1 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -178,6 +178,7 @@
PREEXECHOOKS("hive.exec.pre.hooks", ""),
POSTEXECHOOKS("hive.exec.post.hooks", ""),
ONFAILUREHOOKS("hive.exec.failure.hooks", ""),
+ OPERATORHOOKS("hive.exec.operator.hooks", ""),
CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""),
EXECPARALLEL("hive.exec.parallel", false), // parallel query launching
EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8),
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
index 9a8c237..8b2866f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
@@ -50,6 +50,7 @@
private JobConf jc;
private boolean abort = false;
private Reporter rp;
+ private List opHooks;
public static final Log l4j = LogFactory.getLog("ExecMapper");
private static boolean done;
@@ -96,6 +97,7 @@ public void configure(JobConf job) {
mo.setExecContext(execContext);
mo.initializeLocalWork(jc);
mo.initialize(jc, null);
+ opHooks = OperatorHookUtils.getOperatorHooks(jc);
if (localWork == null) {
return;
@@ -130,6 +132,7 @@ public void map(Object key, Object value, OutputCollector output,
rp = reporter;
mo.setOutputCollector(oc);
mo.setReporter(rp);
+ mo.setOperatorHooks(opHooks);
}
// reset the execContext for each new row
execContext.resetRow();
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
index 18a9bd2..e63b5ce 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,6 +66,7 @@
private long nextCntr = 1;
private static String[] fieldNames;
+ private List opHooks;
public static final Log l4j = LogFactory.getLog("ExecReducer");
private boolean isLogInfoEnabled = false;
@@ -148,6 +150,7 @@ public void configure(JobConf job) {
try {
l4j.info(reducer.dump(0));
reducer.initialize(jc, rowObjectInspector);
+ opHooks = OperatorHookUtils.getOperatorHooks(jc);
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
@@ -178,6 +181,7 @@ public void reduce(Object key, Iterator values, OutputCollector output,
rp = reporter;
reducer.setOutputCollector(oc);
reducer.setReporter(rp);
+ reducer.setOperatorHooks(opHooks);
}
try {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
index 83cf653..effc540 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
@@ -531,6 +531,7 @@ public int progress(RunningJob rj, JobClient jc) throws IOException {
jobInfo(rj);
MapRedStats mapRedStats = progress(th);
+ this.task.taskHandle = th;
// Not always there is a SessionState. Sometimes ExeDriver is directly invoked
// for special modes. In that case, SessionState.get() is empty.
if (SessionState.get() != null) {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 68302f8..84d6fc1 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -53,7 +53,9 @@
// Bean methods
private static final long serialVersionUID = 1L;
+ List operatorHooks;
+ private Configuration configuration;
protected List> childOperators;
protected List> parentOperators;
protected String operatorId;
@@ -129,6 +131,9 @@ public void setChildOperators(
this.childOperators = childOperators;
}
+ public Configuration getConfiguration() {
+ return configuration;
+ }
public List> getChildOperators() {
return childOperators;
}
@@ -226,6 +231,17 @@ public String getIdentifier() {
return id;
}
+ public void setOperatorHooks(List opHooks){
+ operatorHooks = opHooks;
+ if (childOperators == null) {
+ return;
+ }
+
+ for (Operator extends OperatorDesc> op : childOperators) {
+ op.setOperatorHooks(opHooks);
+ }
+ }
+
public void setReporter(Reporter rep) {
reporter = rep;
@@ -313,6 +329,7 @@ public void initialize(Configuration hconf, ObjectInspector[] inputOIs)
return;
}
+ this.configuration = hconf;
this.out = null;
if (!areAllParentsInitialized()) {
return;
@@ -409,6 +426,24 @@ public void passExecContext(ExecMapperContext execContext) {
}
}
+ private void enterOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
+ if (this.operatorHooks == null) {
+ return;
+ }
+ for(OperatorHook opHook : this.operatorHooks) {
+ opHook.enter(opHookContext);
+ }
+ }
+
+ private void exitOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
+ if (this.operatorHooks == null) {
+ return;
+ }
+ for(OperatorHook opHook : this.operatorHooks) {
+ opHook.exit(opHookContext);
+ }
+ }
+
/**
* Collects all the parent's output object inspectors and calls actual
* initialization method.
@@ -470,8 +505,11 @@ public void process(Object row, int tag) throws HiveException {
if (fatalError) {
return;
}
+ OperatorHookContext opHookContext = new OperatorHookContext(this, row);
preProcessCounter();
+ enterOperatorHooks(opHookContext);
processOp(row, tag);
+ exitOperatorHooks(opHookContext);
postProcessCounter();
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java
new file mode 100644
index 0000000..bc98b10
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java
@@ -0,0 +1,12 @@
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+/**
+ * OperatorHook is a new interface which
+ * executes pre/post each Operator execution in a map/reduce stage
+ */
+
+public interface OperatorHook {
+ public void enter(OperatorHookContext opHookContext) throws HiveException;
+ public void exit(OperatorHookContext opHookContext) throws HiveException;
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java
new file mode 100644
index 0000000..9eb559f
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java
@@ -0,0 +1,41 @@
+package org.apache.hadoop.hive.ql.exec;
+
+public class OperatorHookContext {
+ private String operatorName;
+ private String operatorId;
+ private Object currentRow;
+ private Operator operator;
+ public OperatorHookContext(Operator op, Object row) {
+ this(op.getName(), op.getIdentifier(), row);
+ this.operator = op;
+ }
+
+ private OperatorHookContext(String opName, String opId, Object row) {
+ operatorName = opName;
+ operatorId = opId;
+ currentRow = row;
+ }
+
+ public Operator getOperator() {
+ return operator;
+ }
+
+ public String getOperatorName() {
+ return operatorName;
+ }
+
+ public String getOperatorId() {
+ return operatorId;
+ }
+
+ public Object getCurrentRow() {
+ return currentRow;
+ }
+
+ @Override
+ public String toString() {
+ return "operatorName= " + this.getOperatorName() +
+ ", id=" + this.getOperatorId();
+
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java
new file mode 100644
index 0000000..11394cf
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java
@@ -0,0 +1,34 @@
+
+package org.apache.hadoop.hive.ql.exec;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+public class OperatorHookUtils {
+
+ public static final Log LOG = LogFactory.getLog("OperatorHookUtils");
+ public static List getOperatorHooks(Configuration hconf)
+ throws Exception {
+ List opHooks = new ArrayList();
+ String csOpHooks = HiveConf.getVar(hconf, HiveConf.ConfVars.OPERATORHOOKS);
+ if(csOpHooks == null) {
+ return opHooks;
+ }
+ String[] opHookClasses = csOpHooks.split(",");
+ for(String opHookClass: opHookClasses) {
+ try {
+ OperatorHook opHook =
+ (OperatorHook) Class.forName(opHookClass.trim(), true,
+ JavaUtils.getClassLoader()).newInstance();
+ opHooks.add(opHook);
+ } catch (ClassNotFoundException e) {
+ LOG.error(opHookClass + " Class not found: " + e.getMessage());
+ }
+ }
+ return opHooks;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index b380a69..9409c2a 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -83,7 +83,6 @@
protected String id;
protected T work;
-
public static enum FeedType {
DYNAMIC_PARTITIONS, // list of dynamic partitions
};
@@ -102,6 +101,10 @@ public Task() {
taskTag = Task.NO_TAG;
}
+ public TaskHandle getTaskHandle() {
+ return taskHandle;
+ }
+
public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
this.queryPlan = queryPlan;
isdone = false;
diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java
new file mode 100644
index 0000000..cb52ee8
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java
@@ -0,0 +1,29 @@
+package org.apache.hadoop.hive.ql.exec;
+import java.util.HashMap;
+
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+public class TestOperatorHook implements OperatorHook {
+ protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
+
+ public void enter(OperatorHookContext opHookContext) {
+ incrCounter(TestOperatorHookUtils.TEST_OPERATOR_HOOK_ENTER, opHookContext);
+ }
+
+ public void exit(OperatorHookContext opHookContext) {
+ incrCounter(TestOperatorHookUtils.TEST_OPERATOR_HOOK_EXIT, opHookContext);
+ }
+
+ private void incrCounter(String ctrName, OperatorHookContext opHookContext) {
+ TestOperatorHookUtils.TestOperatorHookCounter ctr =
+ TestOperatorHookUtils.TestOperatorHookCounter.valueOf(ctrName);
+ Operator op = opHookContext.getOperator();
+ LOG.info(ctrName);
+ op.reporter.incrCounter(ctr, 1);
+ Long val = op.reporter.getCounter(ctr).getValue();
+ LOG.info(ctrName + " " + String.valueOf(val));
+ }
+}
+
diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java
new file mode 100644
index 0000000..68e4d7f
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java
@@ -0,0 +1,16 @@
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.Collection;
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+public class TestOperatorHookUtils {
+ public static final String TEST_OPERATOR_HOOK_ENTER = "TEST_OPERATOR_HOOK_ENTER";
+ public static final String TEST_OPERATOR_HOOK_EXIT = "TEST_OPERATOR_HOOK_EXIT";
+
+ public static enum TestOperatorHookCounter {
+ TEST_OPERATOR_HOOK_ENTER,
+ TEST_OPERATOR_HOOK_EXIT,
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java
new file mode 100644
index 0000000..849a644
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java
@@ -0,0 +1,61 @@
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Iterator;
+import java.io.Serializable;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapred.Counters;
+
+public class PostTestOperatorHook implements ExecuteWithHookContext {
+ private void logEnterExitCounters(Task extends Serializable> task) throws IOException {
+ if(task.getTaskHandle() != null) {
+ Counters counters = task.getTaskHandle().getCounters();
+ if(counters != null) {
+ logCounterValue(counters, "TEST_OPERATOR_HOOK_");
+ } else {
+ SessionState.getConsole().printError("counters are null");
+ }
+ } else {
+ SessionState.getConsole().printError("task handle is null");
+ }
+ }
+
+ private void logCounterValue(Counters ctr, String name) {
+ Collection counterGroups = ctr.getGroupNames();
+ for (String groupName : counterGroups) {
+ Counters.Group group = ctr.getGroup(groupName);
+ Iterator it = group.iterator();
+ while (it.hasNext()) {
+ Counters.Counter counter = it.next();
+ if(counter.getName().contains(name)) {
+ SessionState.getConsole().printError(counter.getName() + ": " + counter.getValue());
+ }
+ }
+ }
+ }
+
+ public void run(HookContext hookContext) {
+ HiveConf conf = hookContext.getConf();
+ List completedTasks = hookContext.getCompleteTaskList();
+ if (completedTasks != null) {
+ for (TaskRunner taskRunner : completedTasks) {
+ Task extends Serializable> task = taskRunner.getTask();
+ if (task.isMapRedTask() && !task.isMapRedLocalTask()) {
+ try {
+ logEnterExitCounters(task);
+
+ } catch (Exception e) {
+ SessionState.getConsole().printError("Error in get counters: " + e.toString());
+ }
+ }
+ }
+ }
+ }
+}
diff --git ql/src/test/queries/clientpositive/operatorhook.q ql/src/test/queries/clientpositive/operatorhook.q
new file mode 100644
index 0000000..5822f9c
--- /dev/null
+++ ql/src/test/queries/clientpositive/operatorhook.q
@@ -0,0 +1,9 @@
+SET hive.exec.operator.hooks=org.apache.hadoop.hive.ql.exec.TestOperatorHook;
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostTestOperatorHook;
+SET hive.exec.mode.local.auto=false;
+SET hive.task.progress=true;
+
+-- This query should appear in the Hive CLI output.
+-- We test DriverTestHook, which does exactly that.
+-- This should not break.
+SELECT src.key, sum(substr(src.value, 5)) FROM src GROUP BY src.key
diff --git ql/src/test/results/clientpositive/operatorhook.q.out ql/src/test/results/clientpositive/operatorhook.q.out
new file mode 100644
index 0000000..45f864e
--- /dev/null
+++ ql/src/test/results/clientpositive/operatorhook.q.out
@@ -0,0 +1,318 @@
+PREHOOK: query: -- This query should appear in the Hive CLI output.
+-- We test DriverTestHook, which does exactly that.
+-- This should not break.
+SELECT src.key, sum(substr(src.value, 5)) FROM src GROUP BY src.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+TEST_OPERATOR_HOOK_EXIT: 2736
+TEST_OPERATOR_HOOK_ENTER: 2736
+0 0.0
+10 10.0
+100 200.0
+103 206.0
+104 208.0
+105 105.0
+11 11.0
+111 111.0
+113 226.0
+114 114.0
+116 116.0
+118 236.0
+119 357.0
+12 24.0
+120 240.0
+125 250.0
+126 126.0
+128 384.0
+129 258.0
+131 131.0
+133 133.0
+134 268.0
+136 136.0
+137 274.0
+138 552.0
+143 143.0
+145 145.0
+146 292.0
+149 298.0
+15 30.0
+150 150.0
+152 304.0
+153 153.0
+155 155.0
+156 156.0
+157 157.0
+158 158.0
+160 160.0
+162 162.0
+163 163.0
+164 328.0
+165 330.0
+166 166.0
+167 501.0
+168 168.0
+169 676.0
+17 17.0
+170 170.0
+172 344.0
+174 348.0
+175 350.0
+176 352.0
+177 177.0
+178 178.0
+179 358.0
+18 36.0
+180 180.0
+181 181.0
+183 183.0
+186 186.0
+187 561.0
+189 189.0
+19 19.0
+190 190.0
+191 382.0
+192 192.0
+193 579.0
+194 194.0
+195 390.0
+196 196.0
+197 394.0
+199 597.0
+2 2.0
+20 20.0
+200 400.0
+201 201.0
+202 202.0
+203 406.0
+205 410.0
+207 414.0
+208 624.0
+209 418.0
+213 426.0
+214 214.0
+216 432.0
+217 434.0
+218 218.0
+219 438.0
+221 442.0
+222 222.0
+223 446.0
+224 448.0
+226 226.0
+228 228.0
+229 458.0
+230 1150.0
+233 466.0
+235 235.0
+237 474.0
+238 476.0
+239 478.0
+24 48.0
+241 241.0
+242 484.0
+244 244.0
+247 247.0
+248 248.0
+249 249.0
+252 252.0
+255 510.0
+256 512.0
+257 257.0
+258 258.0
+26 52.0
+260 260.0
+262 262.0
+263 263.0
+265 530.0
+266 266.0
+27 27.0
+272 544.0
+273 819.0
+274 274.0
+275 275.0
+277 1108.0
+278 556.0
+28 28.0
+280 560.0
+281 562.0
+282 564.0
+283 283.0
+284 284.0
+285 285.0
+286 286.0
+287 287.0
+288 576.0
+289 289.0
+291 291.0
+292 292.0
+296 296.0
+298 894.0
+30 30.0
+302 302.0
+305 305.0
+306 306.0
+307 614.0
+308 308.0
+309 618.0
+310 310.0
+311 933.0
+315 315.0
+316 948.0
+317 634.0
+318 954.0
+321 642.0
+322 644.0
+323 323.0
+325 650.0
+327 981.0
+33 33.0
+331 662.0
+332 332.0
+333 666.0
+335 335.0
+336 336.0
+338 338.0
+339 339.0
+34 34.0
+341 341.0
+342 684.0
+344 688.0
+345 345.0
+348 1740.0
+35 105.0
+351 351.0
+353 706.0
+356 356.0
+360 360.0
+362 362.0
+364 364.0
+365 365.0
+366 366.0
+367 734.0
+368 368.0
+369 1107.0
+37 74.0
+373 373.0
+374 374.0
+375 375.0
+377 377.0
+378 378.0
+379 379.0
+382 764.0
+384 1152.0
+386 386.0
+389 389.0
+392 392.0
+393 393.0
+394 394.0
+395 790.0
+396 1188.0
+397 794.0
+399 798.0
+4 4.0
+400 400.0
+401 2005.0
+402 402.0
+403 1209.0
+404 808.0
+406 1624.0
+407 407.0
+409 1227.0
+41 41.0
+411 411.0
+413 826.0
+414 828.0
+417 1251.0
+418 418.0
+419 419.0
+42 84.0
+421 421.0
+424 848.0
+427 427.0
+429 858.0
+43 43.0
+430 1290.0
+431 1293.0
+432 432.0
+435 435.0
+436 436.0
+437 437.0
+438 1314.0
+439 878.0
+44 44.0
+443 443.0
+444 444.0
+446 446.0
+448 448.0
+449 449.0
+452 452.0
+453 453.0
+454 1362.0
+455 455.0
+457 457.0
+458 916.0
+459 918.0
+460 460.0
+462 924.0
+463 926.0
+466 1398.0
+467 467.0
+468 1872.0
+469 2345.0
+47 47.0
+470 470.0
+472 472.0
+475 475.0
+477 477.0
+478 956.0
+479 479.0
+480 1440.0
+481 481.0
+482 482.0
+483 483.0
+484 484.0
+485 485.0
+487 487.0
+489 1956.0
+490 490.0
+491 491.0
+492 984.0
+493 493.0
+494 494.0
+495 495.0
+496 496.0
+497 497.0
+498 1494.0
+5 15.0
+51 102.0
+53 53.0
+54 54.0
+57 57.0
+58 116.0
+64 64.0
+65 65.0
+66 66.0
+67 134.0
+69 69.0
+70 210.0
+72 144.0
+74 74.0
+76 152.0
+77 77.0
+78 78.0
+8 8.0
+80 80.0
+82 82.0
+83 166.0
+84 168.0
+85 85.0
+86 86.0
+87 87.0
+9 9.0
+90 270.0
+92 92.0
+95 190.0
+96 96.0
+97 194.0
+98 196.0