diff --git build-common.xml build-common.xml
index d176a90..5e3f3f6 100644
--- build-common.xml
+++ build-common.xml
@@ -57,7 +57,7 @@
-
+
diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6bf8219..82e0dc5 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -178,6 +178,7 @@
PREEXECHOOKS("hive.exec.pre.hooks", ""),
POSTEXECHOOKS("hive.exec.post.hooks", ""),
ONFAILUREHOOKS("hive.exec.failure.hooks", ""),
+ OPERATORHOOKS("hive.exec.operator.hooks", ""),
CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""),
EXECPARALLEL("hive.exec.parallel", false), // parallel query launching
EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8),
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
index 9a8c237..86f2feb 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
@@ -23,6 +23,7 @@
import java.lang.management.MemoryMXBean;
import java.net.URLClassLoader;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -50,6 +51,7 @@
private JobConf jc;
private boolean abort = false;
private Reporter rp;
+ private List opHooks;
public static final Log l4j = LogFactory.getLog("ExecMapper");
private static boolean done;
@@ -96,6 +98,7 @@ public void configure(JobConf job) {
mo.setExecContext(execContext);
mo.initializeLocalWork(jc);
mo.initialize(jc, null);
+ opHooks = OperatorHookUtils.getOperatorHooks(jc);
if (localWork == null) {
return;
@@ -130,6 +133,7 @@ public void map(Object key, Object value, OutputCollector output,
rp = reporter;
mo.setOutputCollector(oc);
mo.setReporter(rp);
+ mo.setOperatorHooks(opHooks);
}
// reset the execContext for each new row
execContext.resetRow();
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
index 18a9bd2..e63b5ce 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,6 +66,7 @@
private long nextCntr = 1;
private static String[] fieldNames;
+ private List opHooks;
public static final Log l4j = LogFactory.getLog("ExecReducer");
private boolean isLogInfoEnabled = false;
@@ -148,6 +150,7 @@ public void configure(JobConf job) {
try {
l4j.info(reducer.dump(0));
reducer.initialize(jc, rowObjectInspector);
+ opHooks = OperatorHookUtils.getOperatorHooks(jc);
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
@@ -178,6 +181,7 @@ public void reduce(Object key, Iterator values, OutputCollector output,
rp = reporter;
reducer.setOutputCollector(oc);
reducer.setReporter(rp);
+ reducer.setOperatorHooks(opHooks);
}
try {
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 68302f8..7779885 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -53,6 +53,7 @@
// Bean methods
private static final long serialVersionUID = 1L;
+ List operatorHooks;
protected List> childOperators;
protected List> parentOperators;
@@ -226,6 +227,17 @@ public String getIdentifier() {
return id;
}
+ public void setOperatorHooks(List opHooks){
+ operatorHooks = opHooks;
+ if (childOperators == null) {
+ return;
+ }
+
+ for (Operator extends OperatorDesc> op : childOperators) {
+ op.setOperatorHooks(opHooks);
+ }
+ }
+
public void setReporter(Reporter rep) {
reporter = rep;
@@ -409,6 +421,24 @@ public void passExecContext(ExecMapperContext execContext) {
}
}
+ private void enterOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
+ if (this.operatorHooks == null) {
+ return;
+ }
+ for(OperatorHook opHook : this.operatorHooks) {
+ opHook.enter(opHookContext);
+ }
+ }
+
+ private void exitOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
+ if (this.operatorHooks == null) {
+ return;
+ }
+ for(OperatorHook opHook : this.operatorHooks) {
+ opHook.exit(opHookContext);
+ }
+ }
+
/**
* Collects all the parent's output object inspectors and calls actual
* initialization method.
@@ -470,8 +500,12 @@ public void process(Object row, int tag) throws HiveException {
if (fatalError) {
return;
}
+ OperatorHookContext opHookContext = new OperatorHookContext(getName(),
+ getIdentifier(), row, tag);
preProcessCounter();
+ enterOperatorHooks(opHookContext);
processOp(row, tag);
+ exitOperatorHooks(opHookContext);
postProcessCounter();
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java
new file mode 100644
index 0000000..bc98b10
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java
@@ -0,0 +1,12 @@
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+/**
+ * OperatorHook is a new interface which
+ * executes pre/post each Operator execution in a map/reduce stage
+ */
+
+public interface OperatorHook {
+ public void enter(OperatorHookContext opHookContext) throws HiveException;
+ public void exit(OperatorHookContext opHookContext) throws HiveException;
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java
new file mode 100644
index 0000000..31543bb
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java
@@ -0,0 +1,38 @@
+package org.apache.hadoop.hive.ql.exec;
+
+public class OperatorHookContext {
+ private String operatorName;
+ private String operatorId;
+ private Object currentRow;
+ private int parentId;
+ public OperatorHookContext(String opName, String opId, Object row, int tag) {
+ operatorName = opName;
+ operatorId = opId;
+ currentRow = row;
+ parentId = tag;
+ }
+
+ public String getOperatorName() {
+ return operatorName;
+ }
+
+ public String getOperatorId() {
+ return operatorId;
+ }
+
+ public Object getCurrentRow() {
+ return currentRow;
+ }
+
+ public int getParentId() {
+ return parentId;
+ }
+
+ @Override
+ public String toString() {
+ return "operatorName= " + this.getOperatorName() +
+ ", id=" + this.getOperatorId() +
+ ", parentId=" + String.valueOf(this.getParentId());
+
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java
new file mode 100644
index 0000000..11394cf
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java
@@ -0,0 +1,34 @@
+
+package org.apache.hadoop.hive.ql.exec;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+public class OperatorHookUtils {
+
+ public static final Log LOG = LogFactory.getLog("OperatorHookUtils");
+ public static List getOperatorHooks(Configuration hconf)
+ throws Exception {
+ List opHooks = new ArrayList();
+ String csOpHooks = HiveConf.getVar(hconf, HiveConf.ConfVars.OPERATORHOOKS);
+ if(csOpHooks == null) {
+ return opHooks;
+ }
+ String[] opHookClasses = csOpHooks.split(",");
+ for(String opHookClass: opHookClasses) {
+ try {
+ OperatorHook opHook =
+ (OperatorHook) Class.forName(opHookClass.trim(), true,
+ JavaUtils.getClassLoader()).newInstance();
+ opHooks.add(opHook);
+ } catch (ClassNotFoundException e) {
+ LOG.error(opHookClass + " Class not found: " + e.getMessage());
+ }
+ }
+ return opHooks;
+ }
+}
diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java
new file mode 100644
index 0000000..2c91938
--- /dev/null
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java
@@ -0,0 +1,19 @@
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+public class TestOperatorHook implements OperatorHook {
+ private int numEnters = 0;
+ private int numExits = 0;
+ public void enter(OperatorHookContext opHookContext) {
+ SessionState.getConsole().printError(opHookContext.toString());
+ numEnters += 1;
+ }
+ public void exit(OperatorHookContext opHookContext) {
+ numExits += 1;
+ SessionState.getConsole().printError("enters: " + String.valueOf(numEnters) +
+ ", exits: " + String.valueOf(numExits));
+ }
+}
+
diff --git ql/src/test/queries/clientpositive/operatorhook.q ql/src/test/queries/clientpositive/operatorhook.q
new file mode 100644
index 0000000..a52cb5a
--- /dev/null
+++ ql/src/test/queries/clientpositive/operatorhook.q
@@ -0,0 +1,8 @@
+SET hive.exec.operator.hooks=org.apache.hadoop.hive.ql.exec.TestOperatorHook;
+SET hive.exec.mode.local.auto=false;
+SET hive.task.progress=true;
+
+-- This query should appear in the Hive CLI output.
+-- We test DriverTestHook, which does exactly that.
+-- This should not break.
+SELECT src.key, sum(substr(src.value, 5)) FROM src GROUP BY src.key
diff --git ql/src/test/results/clientpositive/operatorhook.q.out ql/src/test/results/clientpositive/operatorhook.q.out
new file mode 100644
index 0000000..bbb8d22
--- /dev/null
+++ ql/src/test/results/clientpositive/operatorhook.q.out
@@ -0,0 +1,323 @@
+PREHOOK: query: -- This query should appear in the Hive CLI output.
+-- We test DriverTestHook, which does exactly that.
+-- This should not break.
+SELECT src.key, sum(substr(src.value, 5)) FROM src GROUP BY src.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: -- This query should appear in the Hive CLI output.
+-- We test DriverTestHook, which does exactly that.
+-- This should not break.
+SELECT src.key, sum(substr(src.value, 5)) FROM src GROUP BY src.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+0 0.0
+10 10.0
+100 200.0
+103 206.0
+104 208.0
+105 105.0
+11 11.0
+111 111.0
+113 226.0
+114 114.0
+116 116.0
+118 236.0
+119 357.0
+12 24.0
+120 240.0
+125 250.0
+126 126.0
+128 384.0
+129 258.0
+131 131.0
+133 133.0
+134 268.0
+136 136.0
+137 274.0
+138 552.0
+143 143.0
+145 145.0
+146 292.0
+149 298.0
+15 30.0
+150 150.0
+152 304.0
+153 153.0
+155 155.0
+156 156.0
+157 157.0
+158 158.0
+160 160.0
+162 162.0
+163 163.0
+164 328.0
+165 330.0
+166 166.0
+167 501.0
+168 168.0
+169 676.0
+17 17.0
+170 170.0
+172 344.0
+174 348.0
+175 350.0
+176 352.0
+177 177.0
+178 178.0
+179 358.0
+18 36.0
+180 180.0
+181 181.0
+183 183.0
+186 186.0
+187 561.0
+189 189.0
+19 19.0
+190 190.0
+191 382.0
+192 192.0
+193 579.0
+194 194.0
+195 390.0
+196 196.0
+197 394.0
+199 597.0
+2 2.0
+20 20.0
+200 400.0
+201 201.0
+202 202.0
+203 406.0
+205 410.0
+207 414.0
+208 624.0
+209 418.0
+213 426.0
+214 214.0
+216 432.0
+217 434.0
+218 218.0
+219 438.0
+221 442.0
+222 222.0
+223 446.0
+224 448.0
+226 226.0
+228 228.0
+229 458.0
+230 1150.0
+233 466.0
+235 235.0
+237 474.0
+238 476.0
+239 478.0
+24 48.0
+241 241.0
+242 484.0
+244 244.0
+247 247.0
+248 248.0
+249 249.0
+252 252.0
+255 510.0
+256 512.0
+257 257.0
+258 258.0
+26 52.0
+260 260.0
+262 262.0
+263 263.0
+265 530.0
+266 266.0
+27 27.0
+272 544.0
+273 819.0
+274 274.0
+275 275.0
+277 1108.0
+278 556.0
+28 28.0
+280 560.0
+281 562.0
+282 564.0
+283 283.0
+284 284.0
+285 285.0
+286 286.0
+287 287.0
+288 576.0
+289 289.0
+291 291.0
+292 292.0
+296 296.0
+298 894.0
+30 30.0
+302 302.0
+305 305.0
+306 306.0
+307 614.0
+308 308.0
+309 618.0
+310 310.0
+311 933.0
+315 315.0
+316 948.0
+317 634.0
+318 954.0
+321 642.0
+322 644.0
+323 323.0
+325 650.0
+327 981.0
+33 33.0
+331 662.0
+332 332.0
+333 666.0
+335 335.0
+336 336.0
+338 338.0
+339 339.0
+34 34.0
+341 341.0
+342 684.0
+344 688.0
+345 345.0
+348 1740.0
+35 105.0
+351 351.0
+353 706.0
+356 356.0
+360 360.0
+362 362.0
+364 364.0
+365 365.0
+366 366.0
+367 734.0
+368 368.0
+369 1107.0
+37 74.0
+373 373.0
+374 374.0
+375 375.0
+377 377.0
+378 378.0
+379 379.0
+382 764.0
+384 1152.0
+386 386.0
+389 389.0
+392 392.0
+393 393.0
+394 394.0
+395 790.0
+396 1188.0
+397 794.0
+399 798.0
+4 4.0
+400 400.0
+401 2005.0
+402 402.0
+403 1209.0
+404 808.0
+406 1624.0
+407 407.0
+409 1227.0
+41 41.0
+411 411.0
+413 826.0
+414 828.0
+417 1251.0
+418 418.0
+419 419.0
+42 84.0
+421 421.0
+424 848.0
+427 427.0
+429 858.0
+43 43.0
+430 1290.0
+431 1293.0
+432 432.0
+435 435.0
+436 436.0
+437 437.0
+438 1314.0
+439 878.0
+44 44.0
+443 443.0
+444 444.0
+446 446.0
+448 448.0
+449 449.0
+452 452.0
+453 453.0
+454 1362.0
+455 455.0
+457 457.0
+458 916.0
+459 918.0
+460 460.0
+462 924.0
+463 926.0
+466 1398.0
+467 467.0
+468 1872.0
+469 2345.0
+47 47.0
+470 470.0
+472 472.0
+475 475.0
+477 477.0
+478 956.0
+479 479.0
+480 1440.0
+481 481.0
+482 482.0
+483 483.0
+484 484.0
+485 485.0
+487 487.0
+489 1956.0
+490 490.0
+491 491.0
+492 984.0
+493 493.0
+494 494.0
+495 495.0
+496 496.0
+497 497.0
+498 1494.0
+5 15.0
+51 102.0
+53 53.0
+54 54.0
+57 57.0
+58 116.0
+64 64.0
+65 65.0
+66 66.0
+67 134.0
+69 69.0
+70 210.0
+72 144.0
+74 74.0
+76 152.0
+77 77.0
+78 78.0
+8 8.0
+80 80.0
+82 82.0
+83 166.0
+84 168.0
+85 85.0
+86 86.0
+87 87.0
+9 9.0
+90 270.0
+92 92.0
+95 190.0
+96 96.0
+97 194.0
+98 196.0