Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1036323) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -256,8 +256,8 @@ HIVEMAXMAPJOINSIZE("hive.mapjoin.maxsize", 100000), HIVEHASHTABLETHRESHOLD("hive.hashtable.initialCapacity", 100000), HIVEHASHTABLELOADFACTOR("hive.hashtable.loadfactor", (float) 0.75), - HIVEHASHTABLEMAXMEMORYUSAGE("hive.hashtable.max.memory.usage", (float) 0.90), - HIVEHASHTABLESCALE("hive.hashtable.scale", (long)100000), + HIVEHASHTABLEMAXMEMORYUSAGE("hive.mapjoin.localtask.max.memory.usage", (float) 0.90), + HIVEHASHTABLESCALE("hive.mapjoin.check.memory.rows", (long)100000), HIVEDEBUGLOCALTASK("hive.debug.localtask",false), Index: ql/src/test/results/clientpositive/mapjoin_hook.q.out =================================================================== --- ql/src/test/results/clientpositive/mapjoin_hook.q.out (revision 0) +++ ql/src/test/results/clientpositive/mapjoin_hook.q.out (revision 0) @@ -0,0 +1,40 @@ +PREHOOK: query: drop table dest1 +PREHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE dest1(key INT, value STRING) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: query: INSERT OVERWRITE TABLE dest1 +SELECT /*+ MAPJOIN(x) */ x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src1 +PREHOOK: Output: default@dest1 +[MapJoinCounter PostHook] Query ID: liyintang_20101123115757_2c161a2c-9b70-4262-bdb2-f64ea4af97c1 CONVERTED_LOCAL_MAPJOIN: 0 CONVERTED_MAPJOIN: 0 LOCAL_MAPJOIN: 1 COMMON_JOIN: 0 BACKUP_COMMON_JOIN: 0 +PREHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key = src3.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +[MapJoinCounter PostHook] Query ID: liyintang_20101123115757_9f4519d1-9b80-4662-a3cd-56e5e0b3126f CONVERTED_LOCAL_MAPJOIN: 1 CONVERTED_MAPJOIN: 1 LOCAL_MAPJOIN: 0 COMMON_JOIN: 0 BACKUP_COMMON_JOIN: 0 +PREHOOK: query: FROM srcpart src1 JOIN src src2 ON (src1.key = src2.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src2.value +where (src1.ds = '2008-04-08' or src1.ds = '2008-04-09' )and (src1.hr = '12' or src1.hr = '11') +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: Output: default@dest1 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.MapRedTask +[MapJoinCounter PostHook] Query ID: liyintang_20101123115757_b4408471-9c74-4790-8093-e711dded68cd CONVERTED_LOCAL_MAPJOIN: 1 CONVERTED_MAPJOIN: 0 LOCAL_MAPJOIN: 0 COMMON_JOIN: 0 BACKUP_COMMON_JOIN: 1 +PREHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.MapRedTask +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.MapRedTask +[MapJoinCounter PostHook] Query ID: liyintang_20101123115757_76a9fb0c-82b3-4c0c-8ce7-87eac11975ef CONVERTED_LOCAL_MAPJOIN: 2 CONVERTED_MAPJOIN: 0 LOCAL_MAPJOIN: 0 COMMON_JOIN: 0 BACKUP_COMMON_JOIN: 2 Index: ql/src/test/results/clientpositive/auto_join25.q.out =================================================================== --- ql/src/test/results/clientpositive/auto_join25.q.out (revision 1036323) +++ ql/src/test/results/clientpositive/auto_join25.q.out (working copy) @@ -13,6 +13,8 @@ PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 PREHOOK: Output: default@dest1 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.MapRedTask POSTHOOK: query: FROM srcpart src1 JOIN src src2 ON (src1.key = src2.key) INSERT OVERWRITE TABLE dest1 SELECT src1.key, src2.value where (src1.ds = '2008-04-08' or src1.ds = '2008-04-09' )and (src1.hr = '12' or src1.hr = '11') @@ -28,11 +30,11 @@ PREHOOK: query: SELECT sum(hash(dest1.key,dest1.value)) FROM dest1 PREHOOK: type: QUERY PREHOOK: Input: default@dest1 -PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-12_13-09-55_396_5931103673819276275/-mr-10000 +PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-23_11-55-57_329_3014183156590695165/-mr-10000 POSTHOOK: query: SELECT sum(hash(dest1.key,dest1.value)) FROM dest1 POSTHOOK: type: QUERY POSTHOOK: Input: default@dest1 -POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-12_13-09-55_396_5931103673819276275/-mr-10000 +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-23_11-55-57_329_3014183156590695165/-mr-10000 POSTHOOK: Lineage: dest1.key EXPRESSION [(srcpart)src1.FieldSchema(name:key, type:string, comment:default), ] POSTHOOK: Lineage: dest1.value SIMPLE [(src)src2.FieldSchema(name:value, type:string, comment:default), ] 407444119660 @@ -48,6 +50,10 @@ PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@dest_j2 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.MapRedTask +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.MapRedTask POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) INSERT OVERWRITE TABLE dest_j2 SELECT src1.key, src3.value POSTHOOK: type: QUERY @@ -60,11 +66,11 @@ PREHOOK: query: SELECT sum(hash(dest_j2.key,dest_j2.value)) FROM dest_j2 PREHOOK: type: QUERY PREHOOK: Input: default@dest_j2 -PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-12_13-10-05_166_1509512648391049274/-mr-10000 +PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-23_11-56-17_043_4193831737882968189/-mr-10000 POSTHOOK: query: SELECT sum(hash(dest_j2.key,dest_j2.value)) FROM dest_j2 POSTHOOK: type: QUERY POSTHOOK: Input: default@dest_j2 -POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-12_13-10-05_166_1509512648391049274/-mr-10000 +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-23_11-56-17_043_4193831737882968189/-mr-10000 POSTHOOK: Lineage: dest1.key EXPRESSION [(srcpart)src1.FieldSchema(name:key, type:string, comment:default), ] POSTHOOK: Lineage: dest1.value SIMPLE [(src)src2.FieldSchema(name:value, type:string, comment:default), ] POSTHOOK: Lineage: dest_j2.key EXPRESSION [(src)src1.FieldSchema(name:key, type:string, comment:default), ] @@ -84,6 +90,8 @@ PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@dest_j1 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.MapRedTask POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value POSTHOOK: type: QUERY @@ -98,11 +106,11 @@ PREHOOK: query: SELECT sum(hash(dest_j1.key,dest_j1.value)) FROM dest_j1 PREHOOK: type: QUERY PREHOOK: Input: default@dest_j1 -PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-12_13-10-12_389_3216138172725864168/-mr-10000 +PREHOOK: Output: file:/tmp/liyintang/hive_2010-11-23_11-56-29_845_7186934150309598803/-mr-10000 POSTHOOK: query: SELECT sum(hash(dest_j1.key,dest_j1.value)) FROM dest_j1 POSTHOOK: type: QUERY POSTHOOK: Input: default@dest_j1 -POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-12_13-10-12_389_3216138172725864168/-mr-10000 +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-11-23_11-56-29_845_7186934150309598803/-mr-10000 POSTHOOK: Lineage: dest1.key EXPRESSION [(srcpart)src1.FieldSchema(name:key, type:string, comment:default), ] POSTHOOK: Lineage: dest1.value SIMPLE [(src)src2.FieldSchema(name:value, type:string, comment:default), ] POSTHOOK: Lineage: dest_j1.key EXPRESSION [(src)src1.FieldSchema(name:key, type:string, comment:default), ] Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java (revision 1036323) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/EnforceReadOnlyTables.java (working copy) @@ -31,9 +31,17 @@ * Implementation of a pre execute hook that prevents modifications * of read-only tables used by the test framework */ -public class EnforceReadOnlyTables implements PreExecute { +public class EnforceReadOnlyTables implements ExecuteWithHookContext { @Override + public void run(HookContext hookContext) throws Exception { + SessionState ss = SessionState.get(); + Set inputs = hookContext.getInputs(); + Set outputs = hookContext.getOutputs(); + UserGroupInformation ugi = hookContext.getUgi(); + this.run(ss,inputs,outputs,ugi); + } + public void run(SessionState sess, Set inputs, Set outputs, UserGroupInformation ugi) throws Exception { Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java (revision 1036323) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java (working copy) @@ -38,7 +38,7 @@ * Implementation of a post execute hook that simply prints out its parameters * to standard output. */ -public class PostExecutePrinter implements PostExecute { +public class PostExecutePrinter implements ExecuteWithHookContext { public class DependencyKeyComp implements Comparator> { @@ -94,6 +94,15 @@ } @Override + public void run(HookContext hookContext) throws Exception { + SessionState ss = SessionState.get(); + Set inputs = hookContext.getInputs(); + Set outputs = hookContext.getOutputs(); + LineageInfo linfo = hookContext.getLinfo(); + UserGroupInformation ugi = hookContext.getUgi(); + this.run(ss,inputs,outputs,linfo,ugi); + } + public void run(SessionState sess, Set inputs, Set outputs, LineageInfo linfo, UserGroupInformation ugi) throws Exception { Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java (revision 1036323) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java (working copy) @@ -31,9 +31,17 @@ * Implementation of a pre execute hook that simply prints out its parameters to * standard output. */ -public class PreExecutePrinter implements PreExecute { +public class PreExecutePrinter implements ExecuteWithHookContext { @Override + public void run(HookContext hookContext) throws Exception { + SessionState ss = SessionState.get(); + Set inputs = hookContext.getInputs(); + Set outputs = hookContext.getOutputs(); + UserGroupInformation ugi = hookContext.getUgi(); + this.run(ss,inputs,outputs,ugi); + } + public void run(SessionState sess, Set inputs, Set outputs, UserGroupInformation ugi) throws Exception { Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/MapJoinCounterHook.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/MapJoinCounterHook.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/MapJoinCounterHook.java (revision 0) @@ -0,0 +1,58 @@ +package org.apache.hadoop.hive.ql.hooks; + +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; + +public class MapJoinCounterHook implements ExecuteWithHookContext { + + public void run(HookContext hookContext) { + HiveConf conf = hookContext.getConf(); + boolean enableConvert = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECONVERTJOIN); + if (!enableConvert) { + return; + } + + QueryPlan plan = hookContext.getQueryPlan(); + String queryID = plan.getQueryId(); + // String query = SessionState.get().getCmd(); + + int convertedMapJoin = 0; + int commonJoin = 0; + int backupCommonJoin = 0; + int convertedLocalMapJoin = 0; + int localMapJoin = 0; + + List list = hookContext.getCompleteTaskList(); + for (TaskRunner tskRunner : list) { + Task tsk = tskRunner.getTask(); + int tag = tsk.getTaskTag(); + switch (tag) { + case Task.COMMON_JOIN: + commonJoin++; + break; + case Task.CONVERTED_LOCAL_MAPJOIN: + convertedLocalMapJoin++; + break; + case Task.CONVERTED_MAPJOIN: + convertedMapJoin++; + break; + case Task.BACKUP_COMMON_JOIN: + backupCommonJoin++; + break; + case Task.LOCAL_MAPJOIN: + localMapJoin++; + break; + } + } + LogHelper console = SessionState.getConsole(); + console.printError("[MapJoinCounter PostHook] Query ID: " + queryID + " CONVERTED_LOCAL_MAPJOIN: " + convertedLocalMapJoin + + " CONVERTED_MAPJOIN: " + convertedMapJoin + " LOCAL_MAPJOIN: "+localMapJoin+ " COMMON_JOIN: "+commonJoin + + " BACKUP_COMMON_JOIN: " + backupCommonJoin); + } +} Index: ql/src/test/queries/clientpositive/mapjoin_hook.q =================================================================== --- ql/src/test/queries/clientpositive/mapjoin_hook.q (revision 0) +++ ql/src/test/queries/clientpositive/mapjoin_hook.q (revision 0) @@ -0,0 +1,30 @@ +set hive.exec.post.hooks = org.apache.hadoop.hive.ql.hooks.MapJoinCounterHook ; +drop table dest1; +CREATE TABLE dest1(key INT, value STRING) STORED AS TEXTFILE; + +set hive.auto.convert.join = true; + +INSERT OVERWRITE TABLE dest1 +SELECT /*+ MAPJOIN(x) */ x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key; + + +FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key = src3.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value; + + + +set hive.mapjoin.localtask.max.memory.usage = 0.0001; +set hive.mapjoin.check.memory.rows = 2; + + +FROM srcpart src1 JOIN src src2 ON (src1.key = src2.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src2.value +where (src1.ds = '2008-04-08' or src1.ds = '2008-04-09' )and (src1.hr = '12' or src1.hr = '11'); + + +FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value; + + + + Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (revision 1036323) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (working copy) @@ -117,6 +117,12 @@ currTask.setBackupChildrenTasks(null); currTask.setBackupTask(null); + if(currTask.getTaskTag() == Task.CONVERTED_MAPJOIN) { + localTask.setTaskTag(Task.CONVERTED_LOCAL_MAPJOIN); + } else { + localTask.setTaskTag(Task.LOCAL_MAPJOIN); + } + //replace the map join operator to local_map_join operator in the operator tree //and return all the dummy parent List> dummyOps= adjustLocalTask(localTask); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (revision 1036323) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (working copy) @@ -72,6 +72,8 @@ if (joinOp == null) { return null; } + currTask.setTaskTag(Task.COMMON_JOIN); + MapredWork currWork = currTask.getWork(); // create conditional work list and task list List listWorks = new ArrayList(); @@ -129,6 +131,7 @@ // add into conditional task listWorks.add(newWork); listTasks.add(newTask); + newTask.setTaskTag(Task.CONVERTED_MAPJOIN); //set up backup task newTask.setBackupTask(currTask); Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/Hook.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/Hook.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/Hook.java (revision 0) @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; +/** + * + * Hook. The interface to implements the hooks in Hive + * + */ +public interface Hook { + +} Property changes on: ql/src/java/org/apache/hadoop/hive/ql/hooks/Hook.java ___________________________________________________________________ Added: svn:executable + * Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/ExecuteWithHookContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/ExecuteWithHookContext.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/ExecuteWithHookContext.java (revision 0) @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + + +public interface ExecuteWithHookContext extends Hook { + + /** + * + * @param hookContext + * The hook context passed to each hooks. + * @throws Exception + */ + void run(HookContext hookContext) throws Exception; + +} Property changes on: ql/src/java/org/apache/hadoop/hive/ql/hooks/ExecuteWithHookContext.java ___________________________________________________________________ Added: svn:executable + * Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java (revision 0) @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.ql.hooks; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.security.UserGroupInformation; +/** + * Hook Context keeps all the necessary information for all the hooks. + * New implemented hook can get the query plan, job conf and the list of all completed tasks from this hook context + */ +public class HookContext { + private QueryPlan queryPlan; + private HiveConf conf; + private List completeTaskList; + private Set inputs; + private Set outputs; + private LineageInfo linfo; + private UserGroupInformation ugi; + + + public HookContext(QueryPlan queryPlan, HiveConf conf) throws Exception{ + this.queryPlan = queryPlan; + this.conf = conf; + completeTaskList = new ArrayList(); + inputs = queryPlan.getInputs(); + outputs = queryPlan.getOutputs(); + ugi = ShimLoader.getHadoopShims().getUGIForConf(conf); + linfo= null; + if(SessionState.get() != null){ + linfo = SessionState.get().getLineageState().getLineageInfo(); + } + } + + public QueryPlan getQueryPlan() { + return queryPlan; + } + + public void setQueryPlan(QueryPlan queryPlan) { + this.queryPlan = queryPlan; + } + + public HiveConf getConf() { + return conf; + } + + public void setConf(HiveConf conf) { + this.conf = conf; + } + + public List getCompleteTaskList() { + return completeTaskList; + } + + public void setCompleteTaskList(List completeTaskList) { + this.completeTaskList = completeTaskList; + } + + public void addCompleteTask(TaskRunner completeTaskRunner) { + completeTaskList.add(completeTaskRunner); + } + + public Set getInputs() { + return inputs; + } + + public void setInputs(Set inputs) { + this.inputs = inputs; + } + + public Set getOutputs() { + return outputs; + } + + public void setOutputs(Set outputs) { + this.outputs = outputs; + } + + public LineageInfo getLinfo() { + return linfo; + } + + public void setLinfo(LineageInfo linfo) { + this.linfo = linfo; + } + + public UserGroupInformation getUgi() { + return ugi; + } + + public void setUgi(UserGroupInformation ugi) { + this.ugi = ugi; + } +} Property changes on: ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java ___________________________________________________________________ Added: svn:executable + * Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecute.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecute.java (revision 1036323) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecute.java (working copy) @@ -27,7 +27,7 @@ * The post execute hook interface. A list of such hooks can be configured to be * called after compilation and before execution. */ -public interface PostExecute { +public interface PostExecute extends Hook { /** * The run command that is called just before the execution of the query. @@ -43,6 +43,7 @@ * @param ugi * The user group security information. */ + @Deprecated void run(SessionState sess, Set inputs, Set outputs, LineageInfo lInfo, UserGroupInformation ugi) throws Exception; Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecute.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecute.java (revision 1036323) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecute.java (working copy) @@ -27,7 +27,7 @@ * The pre execute hook interface. A list of such hooks can be configured to be * called after compilation and before execution. */ -public interface PreExecute { +public interface PreExecute extends Hook { /** * The run command that is called just before the execution of the query. @@ -41,6 +41,7 @@ * @param ugi * The user group security information. */ + @Deprecated public void run(SessionState sess, Set inputs, Set outputs, UserGroupInformation ugi) throws Exception; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (revision 1036323) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (working copy) @@ -253,9 +253,6 @@ float hashTableMaxMemoryUsage = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE); hashTableScale = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEHASHTABLESCALE); - if (hashTableScale <= 0) { - hashTableScale = 1; - } // initialize the hash tables for other tables for (Byte pos : order) { @@ -408,7 +405,7 @@ bigBucketFileName = "-"; } // get the tmp URI path; it will be a hdfs path if not local mode - String tmpURIPath = PathUtil.generatePath(tmpURI, tag, bigBucketFileName); + String tmpURIPath = Utilities.generatePath(tmpURI, tag, bigBucketFileName); hashTable.isAbort(rowNumber, console); console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath); // get the hashtable file and path Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (revision 1036323) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (working copy) @@ -60,8 +60,16 @@ protected transient boolean clonedConf = false; protected Task backupTask; protected List> backupChildrenTasks = new ArrayList>(); + protected int taskTag; + public static final int NO_TAG = 0; + public static final int COMMON_JOIN = 1; + public static final int CONVERTED_MAPJOIN = 2; + public static final int CONVERTED_LOCAL_MAPJOIN = 3; + public static final int BACKUP_COMMON_JOIN = 4; + public static final int LOCAL_MAPJOIN=5; + // Descendants tasks who subscribe feeds from this task protected transient List> feedSubscribers; @@ -81,6 +89,7 @@ queued = false; LOG = LogFactory.getLog(this.getClass().getName()); this.taskCounters = new HashMap(); + taskTag = Task.NO_TAG; } public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { @@ -186,6 +195,10 @@ // recursively remove task from its children tasks if this task doesn't have any parent task this.removeFromChildrenTasks(); + //set task tag + if(this.taskTag == Task.CONVERTED_LOCAL_MAPJOIN) { + backupTask.setTaskTag(Task.BACKUP_COMMON_JOIN); + } } return backupTask; } @@ -445,4 +458,13 @@ conf = new HiveConf(conf); } } + + + public int getTaskTag() { + return taskTag; + } + + public void setTaskTag(int taskTag) { + this.taskTag = taskTag; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (revision 1036323) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (working copy) @@ -161,7 +161,6 @@ int size = mHash.size(); long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); double rate = (double) usedMemory / (double) maxMemory; - long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); console.printInfo(Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t" + size + "\tMemory usage:\t" + usedMemory + "\trate:\t" + num.format(rate)); if (rate > (double) maxMemoryUsage) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java (revision 1036323) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java (working copy) @@ -1,20 +0,0 @@ -package org.apache.hadoop.hive.ql.exec; - -import org.apache.hadoop.fs.Path; - -public class PathUtil { - public static String suffix=".hashtable"; - public static String generatePath(String baseURI,Byte tag,String bigBucketFileName){ - String path = new String(baseURI+Path.SEPARATOR+"-"+tag+"-"+bigBucketFileName+suffix); - return path; - } - public static String generateFileName(Byte tag,String bigBucketFileName){ - String fileName = new String("-"+tag+"-"+bigBucketFileName+suffix); - return fileName; - } - - public static String generateTmpURI(String baseURI,String id){ - String tmpFileURI = new String(baseURI+Path.SEPARATOR+"HashTable-"+id); - return tmpFileURI; - } -} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1036323) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -591,7 +591,7 @@ FileStatus file = hashtableFiles[i]; Path path = file.getPath(); String fileName = path.getName(); - String hdfsFile = hdfsPath + Path.SEPARATOR + fileName; + String hdfsFile = Utilities.generatePath(hdfsPath, fileName); LOG.info("Upload 1 HashTable from" + path + " to: " + hdfsFile); Path hdfsFilePath = new Path(hdfsFile); @@ -604,8 +604,7 @@ for (int i = 0; i < hashtableRemoteFiles.length; i++) { FileStatus file = hashtableRemoteFiles[i]; Path path = file.getPath(); - DistributedCache.addCacheFile(path.toUri(), job); - + DistributedCache.addCacheArchive(path.toUri(), job); LOG.info("add 1 hashtable file to distributed cache: " + path.toUri()); } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1036323) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -1582,6 +1582,11 @@ return fileName; } + public static String generatePath(Path baseURI, String filename) { + String path = new String(baseURI + Path.SEPARATOR + filename); + return path; + } + public static String generateTmpURI(String baseURI, String id) { String tmpFileURI = new String(baseURI + Path.SEPARATOR + "HashTable-" + id); return tmpFileURI; Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1036323) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -55,6 +55,9 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; +import org.apache.hadoop.hive.ql.hooks.Hook; +import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.PostExecute; import org.apache.hadoop.hive.ql.hooks.PreExecute; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -702,8 +705,8 @@ return new CommandProcessorResponse(ret); } - private List getPreExecHooks() throws Exception { - ArrayList pehooks = new ArrayList(); + private List getPreExecHooks() throws Exception { + ArrayList pehooks = new ArrayList(); String pestr = conf.getVar(HiveConf.ConfVars.PREEXECHOOKS); pestr = pestr.trim(); if (pestr.equals("")) { @@ -714,7 +717,7 @@ for (String peClass : peClasses) { try { - pehooks.add((PreExecute) Class.forName(peClass.trim(), true, JavaUtils.getClassLoader()) + pehooks.add((Hook) Class.forName(peClass.trim(), true, JavaUtils.getClassLoader()) .newInstance()); } catch (ClassNotFoundException e) { console.printError("Pre Exec Hook Class not found:" + e.getMessage()); @@ -725,8 +728,8 @@ return pehooks; } - private List getPostExecHooks() throws Exception { - ArrayList pehooks = new ArrayList(); + private List getPostExecHooks() throws Exception { + ArrayList pehooks = new ArrayList(); String pestr = conf.getVar(HiveConf.ConfVars.POSTEXECHOOKS); pestr = pestr.trim(); if (pestr.equals("")) { @@ -737,7 +740,7 @@ for (String peClass : peClasses) { try { - pehooks.add((PostExecute) Class.forName(peClass.trim(), true, JavaUtils.getClassLoader()) + pehooks.add((Hook) Class.forName(peClass.trim(), true, JavaUtils.getClassLoader()) .newInstance()); } catch (ClassNotFoundException e) { console.printError("Post Exec Hook Class not found:" + e.getMessage()); @@ -773,12 +776,18 @@ } resStream = null; - // Get all the pre execution hooks and execute them. - for (PreExecute peh : getPreExecHooks()) { - peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(), ShimLoader - .getHadoopShims().getUGIForConf(conf)); + HookContext hookContext = new HookContext(plan, conf); + + for (Hook peh : getPreExecHooks()) { + if (peh instanceof ExecuteWithHookContext) { + ((ExecuteWithHookContext) peh).run(hookContext); + } else if (peh instanceof PreExecute) { + ((PreExecute) peh).run(SessionState.get(), plan.getInputs(), plan.getOutputs(), + ShimLoader.getHadoopShims().getUGIForConf(conf)); + } } + int jobs = Utilities.getMRTasks(plan.getRootTasks()).size(); if (jobs > 0) { console.printInfo("Total MapReduce jobs = " + jobs); @@ -820,6 +829,7 @@ TaskResult tskRes = pollTasks(running.keySet()); TaskRunner tskRun = running.remove(tskRes); Task tsk = tskRun.getTask(); + hookContext.addCompleteTask(tskRun); int exitVal = tskRes.getExitVal(); if (exitVal != 0) { @@ -827,10 +837,10 @@ if (backupTask != null) { errorMessage = "FAILED: Execution Error, return code " + exitVal + " from " + tsk.getClass().getName(); - console.printInfo(errorMessage); + console.printError(errorMessage); errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName(); - console.printInfo(errorMessage); + console.printError(errorMessage); // add backup task to runnable if (DriverContext.isLaunchable(backupTask)) { @@ -885,12 +895,17 @@ } // Get all the post execution hooks and execute them. - for (PostExecute peh : getPostExecHooks()) { - peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(), - (SessionState.get() != null ? SessionState.get().getLineageState().getLineageInfo() - : null), ShimLoader.getHadoopShims().getUGIForConf(conf)); + for (Hook peh : getPostExecHooks()) { + if (peh instanceof ExecuteWithHookContext) { + ((ExecuteWithHookContext) peh).run(hookContext); + } else if (peh instanceof PostExecute) { + ((PostExecute) peh).run(SessionState.get(), plan.getInputs(), plan.getOutputs(), + (SessionState.get() != null ? SessionState.get().getLineageState().getLineageInfo() + : null), ShimLoader.getHadoopShims().getUGIForConf(conf)); + } } + if (SessionState.get() != null) { SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE, String.valueOf(0));