diff --git build-common.xml build-common.xml index b88ddbc..6b7cc0f 100644 --- build-common.xml +++ build-common.xml @@ -57,7 +57,7 @@ - + diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 15fce1e..a42e9f1 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -178,6 +178,7 @@ PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), ONFAILUREHOOKS("hive.exec.failure.hooks", ""), + OPERATORHOOKS("hive.exec.operator.hooks", ""), CLIENTSTATSPUBLISHERS("hive.client.stats.publishers", ""), EXECPARALLEL("hive.exec.parallel", false), // parallel query launching EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java index 9a8c237..8b2866f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java @@ -50,6 +50,7 @@ private JobConf jc; private boolean abort = false; private Reporter rp; + private List opHooks; public static final Log l4j = LogFactory.getLog("ExecMapper"); private static boolean done; @@ -96,6 +97,7 @@ public void configure(JobConf job) { mo.setExecContext(execContext); mo.initializeLocalWork(jc); mo.initialize(jc, null); + opHooks = OperatorHookUtils.getOperatorHooks(jc); if (localWork == null) { return; @@ -130,6 +132,7 @@ public void map(Object key, Object value, OutputCollector output, rp = reporter; mo.setOutputCollector(oc); mo.setReporter(rp); + mo.setOperatorHooks(opHooks); } // reset the execContext for each new row execContext.resetRow(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java index 18a9bd2..e63b5ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +66,7 @@ private long nextCntr = 1; private static String[] fieldNames; + private List opHooks; public static final Log l4j = LogFactory.getLog("ExecReducer"); private boolean isLogInfoEnabled = false; @@ -148,6 +150,7 @@ public void configure(JobConf job) { try { l4j.info(reducer.dump(0)); reducer.initialize(jc, rowObjectInspector); + opHooks = OperatorHookUtils.getOperatorHooks(jc); } catch (Throwable e) { abort = true; if (e instanceof OutOfMemoryError) { @@ -178,6 +181,7 @@ public void reduce(Object key, Iterator values, OutputCollector output, rp = reporter; reducer.setOutputCollector(oc); reducer.setReporter(rp); + reducer.setOperatorHooks(opHooks); } try { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java index 83cf653..effc540 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java @@ -531,6 +531,7 @@ public int progress(RunningJob rj, JobClient jc) throws IOException { jobInfo(rj); MapRedStats mapRedStats = progress(th); + this.task.taskHandle = th; // Not always there is a SessionState. Sometimes ExeDriver is directly invoked // for special modes. In that case, SessionState.get() is empty. if (SessionState.get() != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 68302f8..84d6fc1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -53,7 +53,9 @@ // Bean methods private static final long serialVersionUID = 1L; + List operatorHooks; + private Configuration configuration; protected List> childOperators; protected List> parentOperators; protected String operatorId; @@ -129,6 +131,9 @@ public void setChildOperators( this.childOperators = childOperators; } + public Configuration getConfiguration() { + return configuration; + } public List> getChildOperators() { return childOperators; } @@ -226,6 +231,17 @@ public String getIdentifier() { return id; } + public void setOperatorHooks(List opHooks){ + operatorHooks = opHooks; + if (childOperators == null) { + return; + } + + for (Operator op : childOperators) { + op.setOperatorHooks(opHooks); + } + } + public void setReporter(Reporter rep) { reporter = rep; @@ -313,6 +329,7 @@ public void initialize(Configuration hconf, ObjectInspector[] inputOIs) return; } + this.configuration = hconf; this.out = null; if (!areAllParentsInitialized()) { return; @@ -409,6 +426,24 @@ public void passExecContext(ExecMapperContext execContext) { } } + private void enterOperatorHooks(OperatorHookContext opHookContext) throws HiveException { + if (this.operatorHooks == null) { + return; + } + for(OperatorHook opHook : this.operatorHooks) { + opHook.enter(opHookContext); + } + } + + private void exitOperatorHooks(OperatorHookContext opHookContext) throws HiveException { + if (this.operatorHooks == null) { + return; + } + for(OperatorHook opHook : this.operatorHooks) { + opHook.exit(opHookContext); + } + } + /** * Collects all the parent's output object inspectors and calls actual * initialization method. @@ -470,8 +505,11 @@ public void process(Object row, int tag) throws HiveException { if (fatalError) { return; } + OperatorHookContext opHookContext = new OperatorHookContext(this, row); preProcessCounter(); + enterOperatorHooks(opHookContext); processOp(row, tag); + exitOperatorHooks(opHookContext); postProcessCounter(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java new file mode 100644 index 0000000..bc98b10 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHook.java @@ -0,0 +1,12 @@ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +/** + * OperatorHook is a new interface which + * executes pre/post each Operator execution in a map/reduce stage + */ + +public interface OperatorHook { + public void enter(OperatorHookContext opHookContext) throws HiveException; + public void exit(OperatorHookContext opHookContext) throws HiveException; +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java new file mode 100644 index 0000000..9eb559f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookContext.java @@ -0,0 +1,41 @@ +package org.apache.hadoop.hive.ql.exec; + +public class OperatorHookContext { + private String operatorName; + private String operatorId; + private Object currentRow; + private Operator operator; + public OperatorHookContext(Operator op, Object row) { + this(op.getName(), op.getIdentifier(), row); + this.operator = op; + } + + private OperatorHookContext(String opName, String opId, Object row) { + operatorName = opName; + operatorId = opId; + currentRow = row; + } + + public Operator getOperator() { + return operator; + } + + public String getOperatorName() { + return operatorName; + } + + public String getOperatorId() { + return operatorId; + } + + public Object getCurrentRow() { + return currentRow; + } + + @Override + public String toString() { + return "operatorName= " + this.getOperatorName() + + ", id=" + this.getOperatorId(); + + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java new file mode 100644 index 0000000..11394cf --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorHookUtils.java @@ -0,0 +1,34 @@ + +package org.apache.hadoop.hive.ql.exec; +import java.util.List; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +public class OperatorHookUtils { + + public static final Log LOG = LogFactory.getLog("OperatorHookUtils"); + public static List getOperatorHooks(Configuration hconf) + throws Exception { + List opHooks = new ArrayList(); + String csOpHooks = HiveConf.getVar(hconf, HiveConf.ConfVars.OPERATORHOOKS); + if(csOpHooks == null) { + return opHooks; + } + String[] opHookClasses = csOpHooks.split(","); + for(String opHookClass: opHookClasses) { + try { + OperatorHook opHook = + (OperatorHook) Class.forName(opHookClass.trim(), true, + JavaUtils.getClassLoader()).newInstance(); + opHooks.add(opHook); + } catch (ClassNotFoundException e) { + LOG.error(opHookClass + " Class not found: " + e.getMessage()); + } + } + return opHooks; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index b380a69..9409c2a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -83,7 +83,6 @@ protected String id; protected T work; - public static enum FeedType { DYNAMIC_PARTITIONS, // list of dynamic partitions }; @@ -102,6 +101,10 @@ public Task() { taskTag = Task.NO_TAG; } + public TaskHandle getTaskHandle() { + return taskHandle; + } + public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { this.queryPlan = queryPlan; isdone = false; diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java new file mode 100644 index 0000000..cb52ee8 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHook.java @@ -0,0 +1,29 @@ +package org.apache.hadoop.hive.ql.exec; +import java.util.HashMap; + +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +public class TestOperatorHook implements OperatorHook { + protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); + + public void enter(OperatorHookContext opHookContext) { + incrCounter(TestOperatorHookUtils.TEST_OPERATOR_HOOK_ENTER, opHookContext); + } + + public void exit(OperatorHookContext opHookContext) { + incrCounter(TestOperatorHookUtils.TEST_OPERATOR_HOOK_EXIT, opHookContext); + } + + private void incrCounter(String ctrName, OperatorHookContext opHookContext) { + TestOperatorHookUtils.TestOperatorHookCounter ctr = + TestOperatorHookUtils.TestOperatorHookCounter.valueOf(ctrName); + Operator op = opHookContext.getOperator(); + LOG.info(ctrName); + op.reporter.incrCounter(ctr, 1); + Long val = op.reporter.getCounter(ctr).getValue(); + LOG.info(ctrName + " " + String.valueOf(val)); + } +} + diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java new file mode 100644 index 0000000..68e4d7f --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperatorHookUtils.java @@ -0,0 +1,16 @@ +package org.apache.hadoop.hive.ql.exec; + +import java.util.Collection; +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +public class TestOperatorHookUtils { + public static final String TEST_OPERATOR_HOOK_ENTER = "TEST_OPERATOR_HOOK_ENTER"; + public static final String TEST_OPERATOR_HOOK_EXIT = "TEST_OPERATOR_HOOK_EXIT"; + + public static enum TestOperatorHookCounter { + TEST_OPERATOR_HOOK_ENTER, + TEST_OPERATOR_HOOK_EXIT, + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java new file mode 100644 index 0000000..849a644 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/PostTestOperatorHook.java @@ -0,0 +1,61 @@ +package org.apache.hadoop.hive.ql.hooks; + +import java.util.Collection; +import java.util.List; +import java.util.Iterator; +import java.io.Serializable; +import java.io.IOException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.mapred.Counters; + +public class PostTestOperatorHook implements ExecuteWithHookContext { + private void logEnterExitCounters(Task task) throws IOException { + if(task.getTaskHandle() != null) { + Counters counters = task.getTaskHandle().getCounters(); + if(counters != null) { + logCounterValue(counters, "TEST_OPERATOR_HOOK_"); + } else { + SessionState.getConsole().printError("counters are null"); + } + } else { + SessionState.getConsole().printError("task handle is null"); + } + } + + private void logCounterValue(Counters ctr, String name) { + Collection counterGroups = ctr.getGroupNames(); + for (String groupName : counterGroups) { + Counters.Group group = ctr.getGroup(groupName); + Iterator it = group.iterator(); + while (it.hasNext()) { + Counters.Counter counter = it.next(); + if(counter.getName().contains(name)) { + SessionState.getConsole().printError(counter.getName() + ": " + counter.getValue()); + } + } + } + } + + public void run(HookContext hookContext) { + HiveConf conf = hookContext.getConf(); + List completedTasks = hookContext.getCompleteTaskList(); + if (completedTasks != null) { + for (TaskRunner taskRunner : completedTasks) { + Task task = taskRunner.getTask(); + if (task.isMapRedTask() && !task.isMapRedLocalTask()) { + try { + logEnterExitCounters(task); + + } catch (Exception e) { + SessionState.getConsole().printError("Error in get counters: " + e.toString()); + } + } + } + } + } +} diff --git ql/src/test/queries/clientpositive/operatorhook.q ql/src/test/queries/clientpositive/operatorhook.q new file mode 100644 index 0000000..5822f9c --- /dev/null +++ ql/src/test/queries/clientpositive/operatorhook.q @@ -0,0 +1,9 @@ +SET hive.exec.operator.hooks=org.apache.hadoop.hive.ql.exec.TestOperatorHook; +SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostTestOperatorHook; +SET hive.exec.mode.local.auto=false; +SET hive.task.progress=true; + +-- This query should appear in the Hive CLI output. +-- We test DriverTestHook, which does exactly that. +-- This should not break. +SELECT src.key, sum(substr(src.value, 5)) FROM src GROUP BY src.key diff --git ql/src/test/results/clientpositive/operatorhook.q.out ql/src/test/results/clientpositive/operatorhook.q.out new file mode 100644 index 0000000..45f864e --- /dev/null +++ ql/src/test/results/clientpositive/operatorhook.q.out @@ -0,0 +1,318 @@ +PREHOOK: query: -- This query should appear in the Hive CLI output. +-- We test DriverTestHook, which does exactly that. +-- This should not break. +SELECT src.key, sum(substr(src.value, 5)) FROM src GROUP BY src.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +TEST_OPERATOR_HOOK_EXIT: 2736 +TEST_OPERATOR_HOOK_ENTER: 2736 +0 0.0 +10 10.0 +100 200.0 +103 206.0 +104 208.0 +105 105.0 +11 11.0 +111 111.0 +113 226.0 +114 114.0 +116 116.0 +118 236.0 +119 357.0 +12 24.0 +120 240.0 +125 250.0 +126 126.0 +128 384.0 +129 258.0 +131 131.0 +133 133.0 +134 268.0 +136 136.0 +137 274.0 +138 552.0 +143 143.0 +145 145.0 +146 292.0 +149 298.0 +15 30.0 +150 150.0 +152 304.0 +153 153.0 +155 155.0 +156 156.0 +157 157.0 +158 158.0 +160 160.0 +162 162.0 +163 163.0 +164 328.0 +165 330.0 +166 166.0 +167 501.0 +168 168.0 +169 676.0 +17 17.0 +170 170.0 +172 344.0 +174 348.0 +175 350.0 +176 352.0 +177 177.0 +178 178.0 +179 358.0 +18 36.0 +180 180.0 +181 181.0 +183 183.0 +186 186.0 +187 561.0 +189 189.0 +19 19.0 +190 190.0 +191 382.0 +192 192.0 +193 579.0 +194 194.0 +195 390.0 +196 196.0 +197 394.0 +199 597.0 +2 2.0 +20 20.0 +200 400.0 +201 201.0 +202 202.0 +203 406.0 +205 410.0 +207 414.0 +208 624.0 +209 418.0 +213 426.0 +214 214.0 +216 432.0 +217 434.0 +218 218.0 +219 438.0 +221 442.0 +222 222.0 +223 446.0 +224 448.0 +226 226.0 +228 228.0 +229 458.0 +230 1150.0 +233 466.0 +235 235.0 +237 474.0 +238 476.0 +239 478.0 +24 48.0 +241 241.0 +242 484.0 +244 244.0 +247 247.0 +248 248.0 +249 249.0 +252 252.0 +255 510.0 +256 512.0 +257 257.0 +258 258.0 +26 52.0 +260 260.0 +262 262.0 +263 263.0 +265 530.0 +266 266.0 +27 27.0 +272 544.0 +273 819.0 +274 274.0 +275 275.0 +277 1108.0 +278 556.0 +28 28.0 +280 560.0 +281 562.0 +282 564.0 +283 283.0 +284 284.0 +285 285.0 +286 286.0 +287 287.0 +288 576.0 +289 289.0 +291 291.0 +292 292.0 +296 296.0 +298 894.0 +30 30.0 +302 302.0 +305 305.0 +306 306.0 +307 614.0 +308 308.0 +309 618.0 +310 310.0 +311 933.0 +315 315.0 +316 948.0 +317 634.0 +318 954.0 +321 642.0 +322 644.0 +323 323.0 +325 650.0 +327 981.0 +33 33.0 +331 662.0 +332 332.0 +333 666.0 +335 335.0 +336 336.0 +338 338.0 +339 339.0 +34 34.0 +341 341.0 +342 684.0 +344 688.0 +345 345.0 +348 1740.0 +35 105.0 +351 351.0 +353 706.0 +356 356.0 +360 360.0 +362 362.0 +364 364.0 +365 365.0 +366 366.0 +367 734.0 +368 368.0 +369 1107.0 +37 74.0 +373 373.0 +374 374.0 +375 375.0 +377 377.0 +378 378.0 +379 379.0 +382 764.0 +384 1152.0 +386 386.0 +389 389.0 +392 392.0 +393 393.0 +394 394.0 +395 790.0 +396 1188.0 +397 794.0 +399 798.0 +4 4.0 +400 400.0 +401 2005.0 +402 402.0 +403 1209.0 +404 808.0 +406 1624.0 +407 407.0 +409 1227.0 +41 41.0 +411 411.0 +413 826.0 +414 828.0 +417 1251.0 +418 418.0 +419 419.0 +42 84.0 +421 421.0 +424 848.0 +427 427.0 +429 858.0 +43 43.0 +430 1290.0 +431 1293.0 +432 432.0 +435 435.0 +436 436.0 +437 437.0 +438 1314.0 +439 878.0 +44 44.0 +443 443.0 +444 444.0 +446 446.0 +448 448.0 +449 449.0 +452 452.0 +453 453.0 +454 1362.0 +455 455.0 +457 457.0 +458 916.0 +459 918.0 +460 460.0 +462 924.0 +463 926.0 +466 1398.0 +467 467.0 +468 1872.0 +469 2345.0 +47 47.0 +470 470.0 +472 472.0 +475 475.0 +477 477.0 +478 956.0 +479 479.0 +480 1440.0 +481 481.0 +482 482.0 +483 483.0 +484 484.0 +485 485.0 +487 487.0 +489 1956.0 +490 490.0 +491 491.0 +492 984.0 +493 493.0 +494 494.0 +495 495.0 +496 496.0 +497 497.0 +498 1494.0 +5 15.0 +51 102.0 +53 53.0 +54 54.0 +57 57.0 +58 116.0 +64 64.0 +65 65.0 +66 66.0 +67 134.0 +69 69.0 +70 210.0 +72 144.0 +74 74.0 +76 152.0 +77 77.0 +78 78.0 +8 8.0 +80 80.0 +82 82.0 +83 166.0 +84 168.0 +85 85.0 +86 86.0 +87 87.0 +9 9.0 +90 270.0 +92 92.0 +95 190.0 +96 96.0 +97 194.0 +98 196.0