Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 945403) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -81,6 +81,7 @@ MAXREDUCERS("hive.exec.reducers.max", 999), PREEXECHOOKS("hive.exec.pre.hooks", ""), POSTEXECHOOKS("hive.exec.post.hooks", ""), + PRETASKHOOKS("hive.exec.pre.task.hooks", ""), EXECPARALLEL("hive.exec.parallel", false), // parallel query launching EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8), HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true), Index: ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (revision 945403) +++ ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (working copy) @@ -20,10 +20,11 @@ import java.io.Serializable; import java.util.LinkedList; +import java.util.List; import java.util.Queue; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.PreTask; /** * DriverContext. @@ -38,6 +39,11 @@ Context ctx; + /** + * List of hooks fired before task execution. + */ + private List preTaskHooks; + public DriverContext() { this.runnable = null; this.ctx = null; @@ -82,4 +88,18 @@ public void incCurJobNo(int amount) { this.curJobNo = this.curJobNo + amount; } + + /** + * @param preTaskHooks the preTaskHooks to set + */ + public void setPreTaskHooks(List preTaskHooks) { + this.preTaskHooks = preTaskHooks; + } + + /** + * @return the preTaskHooks + */ + public List getPreTaskHooks() { + return preTaskHooks; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/PreTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/PreTask.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/PreTask.java (revision 0) @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * PreTask Hook. This hook is called before the execution of any task. It can be used + * to implement task execution policies such as sending jobs to different map/reduce + * clusters depending on size etc. + * + */ +public interface PreTask { + + /** + * Called before the task is executed. + * + * @param tskRunner The task runner for the task. + * @param ugi The user group information for the task. + * @throws Exception + */ + public void run(TaskRunner tskRunner, UserGroupInformation ugi) + throws Exception; + +} Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 945403) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.PostExecute; import org.apache.hadoop.hive.ql.hooks.PreExecute; +import org.apache.hadoop.hive.ql.hooks.PreTask; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ErrorMsg; @@ -387,50 +388,38 @@ return new CommandProcessorResponse(ret); } - private List getPreExecHooks() throws Exception { - ArrayList pehooks = new ArrayList(); - String pestr = conf.getVar(HiveConf.ConfVars.PREEXECHOOKS); - pestr = pestr.trim(); - if (pestr.equals("")) { - return pehooks; + private List getHookList(String hookConfStr) throws Exception { + ArrayList hooks = new ArrayList(); + String confStr = hookConfStr.trim(); + if (confStr.equals("")) { + return hooks; } - String[] peClasses = pestr.split(","); + String[] hClasses = confStr.split(","); - for (String peClass : peClasses) { + for (String hClass : hClasses) { try { - pehooks.add((PreExecute) Class.forName(peClass.trim(), true, + hooks.add((T) Class.forName(hClass.trim(), true, JavaUtils.getClassLoader()).newInstance()); } catch (ClassNotFoundException e) { - console.printError("Pre Exec Hook Class not found:" + e.getMessage()); + console.printError("Hook Class not found:" + e.getMessage()); throw e; } } - return pehooks; + return hooks; } + private List getPreExecHooks() throws Exception { + return getHookList(conf.getVar(HiveConf.ConfVars.PREEXECHOOKS)); + } + private List getPostExecHooks() throws Exception { - ArrayList pehooks = new ArrayList(); - String pestr = conf.getVar(HiveConf.ConfVars.POSTEXECHOOKS); - pestr = pestr.trim(); - if (pestr.equals("")) { - return pehooks; - } + return getHookList(conf.getVar(HiveConf.ConfVars.POSTEXECHOOKS)); + } - String[] peClasses = pestr.split(","); - - for (String peClass : peClasses) { - try { - pehooks.add((PostExecute) Class.forName(peClass.trim(), true, - JavaUtils.getClassLoader()).newInstance()); - } catch (ClassNotFoundException e) { - console.printError("Post Exec Hook Class not found:" + e.getMessage()); - throw e; - } - } - - return pehooks; + private List getPreTaskHooks() throws Exception { + return getHookList(conf.getVar(HiveConf.ConfVars.PRETASKHOOKS)); } public int execute() { @@ -488,6 +477,8 @@ Map running = new HashMap(); DriverContext driverCxt = new DriverContext(runnable, ctx); + // Set the task hooks + driverCxt.setPreTaskHooks(getPreTaskHooks()); // Add root Tasks to runnable @@ -604,7 +595,7 @@ public void launchTask(Task tsk, String queryId, boolean noName, Map running, String jobname, - int jobs, DriverContext cxt) { + int jobs, DriverContext cxt) throws Exception { if (SessionState.get() != null) { SessionState.get().getHiveHistory().startTask(queryId, tsk, @@ -622,6 +613,12 @@ TaskResult tskRes = new TaskResult(); TaskRunner tskRun = new TaskRunner(tsk, tskRes); + // Execute the pre task hooks + for(PreTask pt: cxt.getPreTaskHooks()) { + pt.run(tskRun, UnixUserGroupInformation.readFromConf(conf, + UnixUserGroupInformation.UGI_PROPERTY_NAME)); + } + // Launch Task if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {