Index: ql/src/test/results/clientpositive/mapjoin_hook.q.out =================================================================== --- ql/src/test/results/clientpositive/mapjoin_hook.q.out (revision 0) +++ ql/src/test/results/clientpositive/mapjoin_hook.q.out (revision 0) @@ -0,0 +1,34 @@ +PREHOOK: query: drop table dest1 +PREHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE dest1(key INT, value STRING) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: query: INSERT OVERWRITE TABLE dest1 +SELECT /*+ MAPJOIN(x) */ x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src1 +PREHOOK: Output: default@dest1 +[MapJoinCounter PostHook] CONVERTED_LOCAL_MAPJOIN: 0 CONVERTED_MAPJOIN: 0 LOCAL_MAPJOIN: 1 COMMON_JOIN: 0 BACKUP_COMMON_JOIN: 0 +PREHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key = src3.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +[MapJoinCounter PostHook] CONVERTED_LOCAL_MAPJOIN: 1 CONVERTED_MAPJOIN: 1 LOCAL_MAPJOIN: 0 COMMON_JOIN: 0 BACKUP_COMMON_JOIN: 0 +PREHOOK: query: FROM srcpart src1 JOIN src src2 ON (src1.key = src2.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src2.value +where (src1.ds = '2008-04-08' or src1.ds = '2008-04-09' )and (src1.hr = '12' or src1.hr = '11') +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: Output: default@dest1 +[MapJoinCounter PostHook] CONVERTED_LOCAL_MAPJOIN: 1 CONVERTED_MAPJOIN: 0 LOCAL_MAPJOIN: 0 COMMON_JOIN: 0 BACKUP_COMMON_JOIN: 1 +PREHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +[MapJoinCounter PostHook] CONVERTED_LOCAL_MAPJOIN: 2 CONVERTED_MAPJOIN: 0 LOCAL_MAPJOIN: 0 COMMON_JOIN: 0 BACKUP_COMMON_JOIN: 2 Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/MapJoinCounterHook.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/MapJoinCounterHook.java (revision 0) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/MapJoinCounterHook.java (revision 0) @@ -0,0 +1,58 @@ +package org.apache.hadoop.hive.ql.hooks; + +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; + +public class MapJoinCounterHook implements ExecuteWithHookContext { + + public void run(HookContext hookContext) { + HiveConf conf = hookContext.getConf(); + boolean enableConvert = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECONVERTJOIN); + if (!enableConvert) { + return; + } + + QueryPlan plan = hookContext.getQueryPlan(); + String queryID = plan.getQueryId(); + // String query = SessionState.get().getCmd(); + + int convertedMapJoin = 0; + int commonJoin = 0; + int backupCommonJoin = 0; + int convertedLocalMapJoin = 0; + int localMapJoin = 0; + + List list = hookContext.getCompleteTaskList(); + for (TaskRunner tskRunner : list) { + Task tsk = tskRunner.getTask(); + int tag = tsk.getTaskTag(); + switch (tag) { + case Task.COMMON_JOIN: + commonJoin++; + break; + case Task.CONVERTED_LOCAL_MAPJOIN: + convertedLocalMapJoin++; + break; + case Task.CONVERTED_MAPJOIN: + convertedMapJoin++; + break; + case Task.BACKUP_COMMON_JOIN: + backupCommonJoin++; + break; + case Task.LOCAL_MAPJOIN: + localMapJoin++; + break; + } + } + LogHelper console = SessionState.getConsole(); + console.printError("[MapJoinCounter PostHook] CONVERTED_LOCAL_MAPJOIN: " + convertedLocalMapJoin + + " CONVERTED_MAPJOIN: " + convertedMapJoin + " LOCAL_MAPJOIN: "+localMapJoin+ " COMMON_JOIN: "+commonJoin + + " BACKUP_COMMON_JOIN: " + backupCommonJoin); + } +} Index: ql/src/test/queries/clientpositive/mapjoin_hook.q =================================================================== --- ql/src/test/queries/clientpositive/mapjoin_hook.q (revision 0) +++ ql/src/test/queries/clientpositive/mapjoin_hook.q (revision 0) @@ -0,0 +1,30 @@ +set hive.exec.post.hooks = org.apache.hadoop.hive.ql.hooks.MapJoinCounterHook ; +drop table dest1; +CREATE TABLE dest1(key INT, value STRING) STORED AS TEXTFILE; + +set hive.auto.convert.join = true; + +INSERT OVERWRITE TABLE dest1 +SELECT /*+ MAPJOIN(x) */ x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key; + + +FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key = src3.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value; + + + +set hive.mapjoin.localtask.max.memory.usage = 0.0001; +set hive.mapjoin.check.memory.rows = 2; + + +FROM srcpart src1 JOIN src src2 ON (src1.key = src2.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src2.value +where (src1.ds = '2008-04-08' or src1.ds = '2008-04-09' )and (src1.hr = '12' or src1.hr = '11'); + + +FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) +INSERT OVERWRITE TABLE dest1 SELECT src1.key, src3.value; + + + + Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (revision 1038464) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (working copy) @@ -117,6 +117,12 @@ currTask.setBackupChildrenTasks(null); currTask.setBackupTask(null); + if(currTask.getTaskTag() == Task.CONVERTED_MAPJOIN) { + localTask.setTaskTag(Task.CONVERTED_LOCAL_MAPJOIN); + } else { + localTask.setTaskTag(Task.LOCAL_MAPJOIN); + } + //replace the map join operator to local_map_join operator in the operator tree //and return all the dummy parent List> dummyOps= adjustLocalTask(localTask); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (revision 1038464) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (working copy) @@ -72,6 +72,8 @@ if (joinOp == null) { return null; } + currTask.setTaskTag(Task.COMMON_JOIN); + MapredWork currWork = currTask.getWork(); // create conditional work list and task list List listWorks = new ArrayList(); @@ -129,6 +131,7 @@ // add into conditional task listWorks.add(newWork); listTasks.add(newTask); + newTask.setTaskTag(Task.CONVERTED_MAPJOIN); //set up backup task newTask.setBackupTask(currTask); Property changes on: ql/src/java/org/apache/hadoop/hive/ql/hooks/Hook.java ___________________________________________________________________ Added: svn:executable + * Property changes on: ql/src/java/org/apache/hadoop/hive/ql/hooks/ExecuteWithHookContext.java ___________________________________________________________________ Added: svn:executable + * Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java (revision 1038464) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java (working copy) @@ -116,5 +116,4 @@ this.ugi = ugi; } - } Property changes on: ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java ___________________________________________________________________ Added: svn:executable + * Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (revision 1038464) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (working copy) @@ -253,9 +253,6 @@ float hashTableMaxMemoryUsage = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE); hashTableScale = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEHASHTABLESCALE); - if (hashTableScale <= 0) { - hashTableScale = 1; - } // initialize the hash tables for other tables for (Byte pos : order) { @@ -408,7 +405,7 @@ bigBucketFileName = "-"; } // get the tmp URI path; it will be a hdfs path if not local mode - String tmpURIPath = PathUtil.generatePath(tmpURI, tag, bigBucketFileName); + String tmpURIPath = Utilities.generatePath(tmpURI, tag, bigBucketFileName); hashTable.isAbort(rowNumber, console); console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath); // get the hashtable file and path Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (revision 1038464) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (working copy) @@ -60,8 +60,16 @@ protected transient boolean clonedConf = false; protected Task backupTask; protected List> backupChildrenTasks = new ArrayList>(); + protected int taskTag; + public static final int NO_TAG = 0; + public static final int COMMON_JOIN = 1; + public static final int CONVERTED_MAPJOIN = 2; + public static final int CONVERTED_LOCAL_MAPJOIN = 3; + public static final int BACKUP_COMMON_JOIN = 4; + public static final int LOCAL_MAPJOIN=5; + // Descendants tasks who subscribe feeds from this task protected transient List> feedSubscribers; @@ -81,6 +89,7 @@ queued = false; LOG = LogFactory.getLog(this.getClass().getName()); this.taskCounters = new HashMap(); + taskTag = Task.NO_TAG; } public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { @@ -186,6 +195,10 @@ // recursively remove task from its children tasks if this task doesn't have any parent task this.removeFromChildrenTasks(); + //set task tag + if(this.taskTag == Task.CONVERTED_LOCAL_MAPJOIN) { + backupTask.setTaskTag(Task.BACKUP_COMMON_JOIN); + } } return backupTask; } @@ -445,4 +458,13 @@ conf = new HiveConf(conf); } } + + + public int getTaskTag() { + return taskTag; + } + + public void setTaskTag(int taskTag) { + this.taskTag = taskTag; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (revision 1038464) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (working copy) @@ -161,7 +161,6 @@ int size = mHash.size(); long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); double rate = (double) usedMemory / (double) maxMemory; - long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); console.printInfo(Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t" + size + "\tMemory usage:\t" + usedMemory + "\trate:\t" + num.format(rate)); if (rate > (double) maxMemoryUsage) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java (revision 1038464) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PathUtil.java (working copy) @@ -1,20 +0,0 @@ -package org.apache.hadoop.hive.ql.exec; - -import org.apache.hadoop.fs.Path; - -public class PathUtil { - public static String suffix=".hashtable"; - public static String generatePath(String baseURI,Byte tag,String bigBucketFileName){ - String path = new String(baseURI+Path.SEPARATOR+"-"+tag+"-"+bigBucketFileName+suffix); - return path; - } - public static String generateFileName(Byte tag,String bigBucketFileName){ - String fileName = new String("-"+tag+"-"+bigBucketFileName+suffix); - return fileName; - } - - public static String generateTmpURI(String baseURI,String id){ - String tmpFileURI = new String(baseURI+Path.SEPARATOR+"HashTable-"+id); - return tmpFileURI; - } -} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1038464) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -591,7 +591,7 @@ FileStatus file = hashtableFiles[i]; Path path = file.getPath(); String fileName = path.getName(); - String hdfsFile = hdfsPath + Path.SEPARATOR + fileName; + String hdfsFile = Utilities.generatePath(hdfsPath, fileName); LOG.info("Upload 1 HashTable from" + path + " to: " + hdfsFile); Path hdfsFilePath = new Path(hdfsFile); @@ -604,8 +604,7 @@ for (int i = 0; i < hashtableRemoteFiles.length; i++) { FileStatus file = hashtableRemoteFiles[i]; Path path = file.getPath(); - DistributedCache.addCacheFile(path.toUri(), job); - + DistributedCache.addCacheArchive(path.toUri(), job); LOG.info("add 1 hashtable file to distributed cache: " + path.toUri()); } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1038464) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -1582,6 +1582,11 @@ return fileName; } + public static String generatePath(Path baseURI, String filename) { + String path = new String(baseURI + Path.SEPARATOR + filename); + return path; + } + public static String generateTmpURI(String baseURI, String id) { String tmpFileURI = new String(baseURI + Path.SEPARATOR + "HashTable-" + id); return tmpFileURI;