Index: ql/src/test/results/clientpositive/mapjoin_hook.q.out =================================================================== --- ql/src/test/results/clientpositive/mapjoin_hook.q.out (revision 0) +++ ql/src/test/results/clientpositive/mapjoin_hook.q.out (revision 0) @@ -0,0 +1,40 @@ +PREHOOK: query: drop table dest1 +PREHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE dest1(key INT, value STRING) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: query: INSERT OVERWRITE TABLE dest1 +SELECT /*+ MAPJOIN(x) */ x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src1 +PREHOOK: Output: default@dest1 +[MapJoinCounter PostHook] CONVERTED_LOCAL_MAPJOIN: 0 CONVERTED_MAPJOIN: 0 LOCAL_MAPJOIN: 1 COMMON_JOIN: 0 BACKUP_COMMON_JOIN: 0 +PREHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key = src3.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +[MapJoinCounter PostHook] CONVERTED_LOCAL_MAPJOIN: 1 CONVERTED_MAPJOIN: 1 LOCAL_MAPJOIN: 0 COMMON_JOIN: 0 BACKUP_COMMON_JOIN: 0 +PREHOOK: query: FROM srcpart src1 JOIN src src2 ON (src1.key = src2.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src2.value +where (src1.ds = '2008-04-08' or src1.ds = '2008-04-09' )and (src1.hr = '12' or src1.hr = '11') +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: Output: default@dest1 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.MapRedTask +[MapJoinCounter PostHook] CONVERTED_LOCAL_MAPJOIN: 1 CONVERTED_MAPJOIN: 0 LOCAL_MAPJOIN: 0 COMMON_JOIN: 0 BACKUP_COMMON_JOIN: 1 +PREHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.MapRedTask +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapredLocalTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.MapRedTask +[MapJoinCounter PostHook] CONVERTED_LOCAL_MAPJOIN: 2 CONVERTED_MAPJOIN: 0 LOCAL_MAPJOIN: 0 COMMON_JOIN: 0 BACKUP_COMMON_JOIN: 2 Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/MapJoinCounterHook.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/MapJoinCounterHook.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/MapJoinCounterHook.java (revision 0) @@ -0,0 +1,58 @@ +package org.apache.hadoop.hive.ql.hooks; + +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; + +public class MapJoinCounterHook implements ExecuteWithHookContext { + + public void run(HookContext hookContext) { + HiveConf conf = hookContext.getConf(); + boolean enableConvert = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECONVERTJOIN); + if (!enableConvert) { + return; + } + + QueryPlan plan = hookContext.getQueryPlan(); + String queryID = plan.getQueryId(); + // String query = SessionState.get().getCmd(); + + int convertedMapJoin = 0; + int commonJoin = 0; + int backupCommonJoin = 0; + int convertedLocalMapJoin = 0; + int localMapJoin = 0; + + List list = hookContext.getCompleteTaskList(); + for (TaskRunner tskRunner : list) { + Task tsk = tskRunner.getTask(); + int tag = tsk.getTaskTag(); + switch (tag) { + case Task.COMMON_JOIN: + commonJoin++; + break; + case Task.CONVERTED_LOCAL_MAPJOIN: + convertedLocalMapJoin++; + break; + case Task.CONVERTED_MAPJOIN: + convertedMapJoin++; + break; + case Task.BACKUP_COMMON_JOIN: + backupCommonJoin++; + break; + case Task.LOCAL_MAPJOIN: + localMapJoin++; + break; + } + } + LogHelper console = SessionState.getConsole(); + console.printError("[MapJoinCounter PostHook] CONVERTED_LOCAL_MAPJOIN: " + convertedLocalMapJoin + + " CONVERTED_MAPJOIN: " + convertedMapJoin + " LOCAL_MAPJOIN: "+localMapJoin+ " COMMON_JOIN: "+commonJoin + + " BACKUP_COMMON_JOIN: " + backupCommonJoin); + } +} Index: ql/src/test/queries/clientpositive/mapjoin_hook.q =================================================================== --- ql/src/test/queries/clientpositive/mapjoin_hook.q (revision 0) +++ ql/src/test/queries/clientpositive/mapjoin_hook.q (revision 0) @@ -0,0 +1,30 @@ +set hive.exec.post.hooks = org.apache.hadoop.hive.ql.hooks.MapJoinCounterHook ; +drop table dest1; +CREATE TABLE dest1(key INT, value STRING) STORED AS TEXTFILE; + +set hive.auto.convert.join = true; + +INSERT OVERWRITE TABLE dest1 +SELECT /*+ MAPJOIN(x) */ x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key; + + +FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key = src3.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value; + + + +set hive.mapjoin.localtask.max.memory.usage = 0.0001; +set hive.mapjoin.check.memory.rows = 2; + + +FROM srcpart src1 JOIN src src2 ON (src1.key = src2.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src2.value +where (src1.ds = '2008-04-08' or src1.ds = '2008-04-09' )and (src1.hr = '12' or src1.hr = '11'); + + +FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value; + + + + Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (revision 1038746) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (working copy) @@ -117,6 +117,12 @@ currTask.setBackupChildrenTasks(null); currTask.setBackupTask(null); + if(currTask.getTaskTag() == Task.CONVERTED_MAPJOIN) { + localTask.setTaskTag(Task.CONVERTED_LOCAL_MAPJOIN); + } else { + localTask.setTaskTag(Task.LOCAL_MAPJOIN); + } + //replace the map join operator to local_map_join operator in the operator tree //and return all the dummy parent List> dummyOps= adjustLocalTask(localTask); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (revision 1038746) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (working copy) @@ -72,6 +72,8 @@ if (joinOp == null) { return null; } + currTask.setTaskTag(Task.COMMON_JOIN); + MapredWork currWork = currTask.getWork(); // create conditional work list and task list List listWorks = new ArrayList(); @@ -129,6 +131,7 @@ // add into conditional task listWorks.add(newWork); listTasks.add(newTask); + newTask.setTaskTag(Task.CONVERTED_MAPJOIN); //set up backup task newTask.setBackupTask(currTask); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (revision 1038746) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (working copy) @@ -60,8 +60,16 @@ protected transient boolean clonedConf = false; protected Task backupTask; protected List> backupChildrenTasks = new ArrayList>(); + protected int taskTag; + public static final int NO_TAG = 0; + public static final int COMMON_JOIN = 1; + public static final int CONVERTED_MAPJOIN = 2; + public static final int CONVERTED_LOCAL_MAPJOIN = 3; + public static final int BACKUP_COMMON_JOIN = 4; + public static final int LOCAL_MAPJOIN=5; + // Descendants tasks who subscribe feeds from this task protected transient List> feedSubscribers; @@ -81,6 +89,7 @@ queued = false; LOG = LogFactory.getLog(this.getClass().getName()); this.taskCounters = new HashMap(); + taskTag = Task.NO_TAG; } public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { @@ -445,4 +454,13 @@ conf = new HiveConf(conf); } } + + + public int getTaskTag() { + return taskTag; + } + + public void setTaskTag(int taskTag) { + this.taskTag = taskTag; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java (revision 1038746) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java (working copy) @@ -97,7 +97,13 @@ resTsks.add(ctx.getCommonJoinTask()); } else { // run the map join task - resTsks.add(ctx.getAliasToTask().get(bigTableAlias)); + Task task = ctx.getAliasToTask().get(bigTableAlias); + //set task tag + if(task.getTaskTag() == Task.CONVERTED_LOCAL_MAPJOIN) { + task.getBackupTask().setTaskTag(Task.BACKUP_COMMON_JOIN); + } + resTsks.add(task); + } return resTsks;