Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1042768) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -202,6 +202,8 @@ HIVEMAPJOINCACHEROWS("hive.mapjoin.cache.numrows", 25000), HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000), HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float) 0.5), + HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY("hive.mapjoin.followby.map.aggr.hash.percentmemory", (float) 0.3), + HIVEMAPAGGRMEMORYTHRESHOLD("hive.map.aggr.hash.force.flush.memory.threshold", (float) 0.9), HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.5), // for hive udtf operator @@ -256,6 +258,7 @@ HIVEMAXMAPJOINSIZE("hive.mapjoin.maxsize", 100000), HIVEHASHTABLETHRESHOLD("hive.hashtable.initialCapacity", 100000), HIVEHASHTABLELOADFACTOR("hive.hashtable.loadfactor", (float) 0.75), + HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE("hive.mapjoin.followby.gby.localtask.max.memory.usage", (float) 0.55), HIVEHASHTABLEMAXMEMORYUSAGE("hive.mapjoin.localtask.max.memory.usage", (float) 0.90), HIVEHASHTABLESCALE("hive.mapjoin.check.memory.rows", (long)100000), @@ -315,7 +318,7 @@ HIVEFETCHOUTPUTSERDE("hive.fetch.output.serde", "org.apache.hadoop.hive.serde2.DelimitedJSONSerDe"), SEMANTIC_ANALYZER_HOOK("hive.semantic.analyzer.hook",null), - + // Print column names in output HIVE_CLI_PRINT_HEADER("hive.cli.print.header", false); ; Index: ql/src/test/results/clientpositive/auto_join26.q.out =================================================================== --- ql/src/test/results/clientpositive/auto_join26.q.out (revision 0) +++ ql/src/test/results/clientpositive/auto_join26.q.out (revision 0) @@ -0,0 +1,315 @@ +PREHOOK: query: CREATE TABLE dest_j1(key INT, cnt INT) +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE dest_j1(key INT, cnt INT) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@dest_j1 +PREHOOK: query: EXPLAIN +INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF src1 x) (TOK_TABREF src y) (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL y) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB dest_j1)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL x) key)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL x) key)))) + +STAGE DEPENDENCIES: + Stage-7 is a root stage , consists of Stage-8, Stage-9, Stage-1 + Stage-8 has a backup stage: Stage-1 + Stage-5 depends on stages: Stage-8 + Stage-2 depends on stages: Stage-1, Stage-5, Stage-6 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + Stage-9 has a backup stage: Stage-1 + Stage-6 depends on stages: Stage-9 + Stage-1 + +STAGE PLANS: + Stage: Stage-7 + Conditional Operator + + Stage: Stage-8 + Map Reduce Local Work + Alias -> Map Local Tables: + y + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + y + TableScan + alias: y + HashTable Sink Operator + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 0 + + Stage: Stage-5 + Map Reduce + Alias -> Map Operator Tree: + x + TableScan + alias: x + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0 + Position of Big Table: 0 + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: _col0 + type: string + mode: hash + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + Local Work: + Map Reduce Local Work + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + file:/tmp/liyintang/hive_2010-12-06_15-13-36_873_2449071063337197304/-mr-10002 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col1 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: bigint + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: UDFToInteger(_col0) + type: int + expr: UDFToInteger(_col1) + type: int + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: dest_j1 + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: dest_j1 + + Stage: Stage-3 + Stats-Aggr Operator + + Stage: Stage-9 + Map Reduce Local Work + Alias -> Map Local Tables: + x + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + x + TableScan + alias: x + HashTable Sink Operator + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 1 + + Stage: Stage-6 + Map Reduce + Alias -> Map Operator Tree: + y + TableScan + alias: y + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0 + Position of Big Table: 1 + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: _col0 + type: string + mode: hash + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + Local Work: + Map Reduce Local Work + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + x + TableScan + alias: x + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 0 + value expressions: + expr: key + type: string + y + TableScan + alias: y + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} + 1 + handleSkewJoin: false + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: _col0 + type: string + mode: hash + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + +PREHOOK: query: INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src1 +PREHOOK: Output: default@dest_j1 +POSTHOOK: query: INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src1 +POSTHOOK: Output: default@dest_j1 +POSTHOOK: Lineage: dest_j1.cnt EXPRESSION [(src1)x.null, (src)y.null, ] +POSTHOOK: Lineage: dest_j1.key EXPRESSION [(src1)x.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: select * from dest_j1 x order by x.key +PREHOOK: type: QUERY +PREHOOK: Input: default@dest_j1 +PREHOOK: Output: file:/tmp/liyintang/hive_2010-12-06_15-13-51_429_1293784409064103219/-mr-10000 +POSTHOOK: query: select * from dest_j1 x order by x.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest_j1 +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-12-06_15-13-51_429_1293784409064103219/-mr-10000 +POSTHOOK: Lineage: dest_j1.cnt EXPRESSION [(src1)x.null, (src)y.null, ] +POSTHOOK: Lineage: dest_j1.key EXPRESSION [(src1)x.FieldSchema(name:key, type:string, comment:default), ] +66 1 +98 2 +128 3 +146 2 +150 1 +213 2 +224 2 +238 2 +255 2 +273 3 +278 2 +311 3 +369 3 +401 5 +406 4 Index: ql/src/test/queries/clientpositive/auto_join26.q =================================================================== --- ql/src/test/queries/clientpositive/auto_join26.q (revision 0) +++ ql/src/test/queries/clientpositive/auto_join26.q (revision 0) @@ -0,0 +1,10 @@ +CREATE TABLE dest_j1(key INT, cnt INT); +set hive.auto.convert.join = true; +EXPLAIN +INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key; + +INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key; + +select * from dest_j1 x order by x.key; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (working copy) @@ -55,31 +55,32 @@ import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx; /** - * An implementation of PhysicalPlanResolver. It iterator each MapRedTask to see whether the task has a local map work - * if it has, it will move the local work to a new local map join task. Then it will make this new generated task depends on - * current task's parent task and make current task depends on this new generated task. + * An implementation of PhysicalPlanResolver. It iterator each MapRedTask to see whether the task + * has a local map work if it has, it will move the local work to a new local map join task. Then it + * will make this new generated task depends on current task's parent task and make current task + * depends on this new generated task. */ public class MapJoinResolver implements PhysicalPlanResolver { @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { - //create dispatcher and graph walker + // create dispatcher and graph walker Dispatcher disp = new LocalMapJoinTaskDispatcher(pctx); TaskGraphWalker ogw = new TaskGraphWalker(disp); - //get all the tasks nodes from root task + // get all the tasks nodes from root task ArrayList topNodes = new ArrayList(); topNodes.addAll(pctx.rootTasks); - //begin to walk through the task tree. + // begin to walk through the task tree. ogw.startWalking(topNodes, null); return pctx; } /** - * Iterator each tasks. If this task has a local work,create a new task for this local work, named MapredLocalTask. - * then make this new generated task depends on current task's parent task, and make current task - * depends on this new generated task + * Iterator each tasks. If this task has a local work,create a new task for this local work, named + * MapredLocalTask. then make this new generated task depends on current task's parent task, and + * make current task depends on this new generated task */ class LocalMapJoinTaskDispatcher implements Dispatcher { @@ -91,195 +92,160 @@ } private void processCurrentTask(Task currTask, - ConditionalTask conditionalTask) throws SemanticException{ - - - //get current mapred work and its local work + ConditionalTask conditionalTask) throws SemanticException { + // get current mapred work and its local work MapredWork mapredWork = (MapredWork) currTask.getWork(); MapredLocalWork localwork = mapredWork.getMapLocalWork(); - - - if(localwork != null){ - //get the context info and set up the shared tmp URI + if (localwork != null) { + // get the context info and set up the shared tmp URI Context ctx = physicalContext.getContext(); String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId()); localwork.setTmpFileURI(tmpFileURI); String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId()); mapredWork.setTmpHDFSFileURI(hdfsTmpURI); - //create a task for this local work; right now, this local work is shared - //by the original MapredTask and this new generated MapredLocalTask. - MapredLocalTask localTask = (MapredLocalTask) TaskFactory.get(localwork, - physicalContext.getParseContext().getConf()); + // create a task for this local work; right now, this local work is shared + // by the original MapredTask and this new generated MapredLocalTask. + MapredLocalTask localTask = (MapredLocalTask) TaskFactory.get(localwork, physicalContext + .getParseContext().getConf()); - //set the backup task from curr task + // set the backup task from curr task localTask.setBackupTask(currTask.getBackupTask()); localTask.setBackupChildrenTasks(currTask.getBackupChildrenTasks()); currTask.setBackupChildrenTasks(null); currTask.setBackupTask(null); - if(currTask.getTaskTag() == Task.CONVERTED_MAPJOIN) { + if (currTask.getTaskTag() == Task.CONVERTED_MAPJOIN) { localTask.setTaskTag(Task.CONVERTED_LOCAL_MAPJOIN); } else { localTask.setTaskTag(Task.LOCAL_MAPJOIN); } + // replace the map join operator to local_map_join operator in the operator tree + // and return all the dummy parent + LocalMapJoinProcCtx localMapJoinProcCtx= adjustLocalTask(localTask); + List> dummyOps = localMapJoinProcCtx.getDummyParentOp(); - //replace the map join operator to local_map_join operator in the operator tree - //and return all the dummy parent - List> dummyOps= adjustLocalTask(localTask); - - //create new local work and setup the dummy ops + // create new local work and setup the dummy ops MapredLocalWork newLocalWork = new MapredLocalWork(); newLocalWork.setDummyParentOp(dummyOps); newLocalWork.setTmpFileURI(tmpFileURI); newLocalWork.setInputFileChangeSensitive(localwork.getInputFileChangeSensitive()); mapredWork.setMapLocalWork(newLocalWork); - - //get all parent tasks + // get all parent tasks List> parentTasks = currTask.getParentTasks(); currTask.setParentTasks(null); if (parentTasks != null) { - for (Task tsk : parentTasks) { - //make new generated task depends on all the parent tasks of current task. + // make new generated task depends on all the parent tasks of current task. tsk.addDependentTask(localTask); - //remove the current task from its original parent task's dependent task + // remove the current task from its original parent task's dependent task tsk.removeDependentTask(currTask); } - - }else{ - //in this case, current task is in the root tasks - //so add this new task into root tasks and remove the current task from root tasks - if(conditionalTask== null){ + } else { + // in this case, current task is in the root tasks + // so add this new task into root tasks and remove the current task from root tasks + if (conditionalTask == null) { physicalContext.addToRootTask(localTask); physicalContext.removeFromRootTask(currTask); - }else{ - //set list task + } else { + // set list task List> listTask = conditionalTask.getListTasks(); - ConditionalWork conditionalWork= conditionalTask.getWork(); + ConditionalWork conditionalWork = conditionalTask.getWork(); int index = listTask.indexOf(currTask); listTask.set(index, localTask); - - //set list work - List listWork = (List)conditionalWork.getListWorks(); + // set list work + List listWork = (List) conditionalWork.getListWorks(); index = listWork.indexOf(mapredWork); - listWork.set(index,(Serializable)localwork); + listWork.set(index, (Serializable) localwork); conditionalWork.setListWorks(listWork); - ConditionalResolver resolver = conditionalTask.getResolver(); - if(resolver instanceof ConditionalResolverSkewJoin){ - //get bigKeysDirToTaskMap - ConditionalResolverSkewJoinCtx context = - (ConditionalResolverSkewJoinCtx) conditionalTask.getResolverCtx(); - HashMap> bigKeysDirToTaskMap = - context.getDirToTaskMap(); - - //to avoid concurrent modify the hashmap - HashMap> newbigKeysDirToTaskMap = - new HashMap>(); - - - //reset the resolver - for(Map.Entry> entry: bigKeysDirToTaskMap.entrySet()){ + if (resolver instanceof ConditionalResolverSkewJoin) { + // get bigKeysDirToTaskMap + ConditionalResolverSkewJoinCtx context = (ConditionalResolverSkewJoinCtx) conditionalTask + .getResolverCtx(); + HashMap> bigKeysDirToTaskMap = context + .getDirToTaskMap(); + // to avoid concurrent modify the hashmap + HashMap> newbigKeysDirToTaskMap = new HashMap>(); + // reset the resolver + for (Map.Entry> entry : bigKeysDirToTaskMap + .entrySet()) { Task task = entry.getValue(); String key = entry.getKey(); - if(task.equals(currTask)){ + if (task.equals(currTask)) { newbigKeysDirToTaskMap.put(key, localTask); - }else{ + } else { newbigKeysDirToTaskMap.put(key, task); } } - context.setDirToTaskMap(newbigKeysDirToTaskMap); conditionalTask.setResolverCtx(context); - - }else if(resolver instanceof ConditionalResolverCommonJoin){ - //get bigKeysDirToTaskMap - ConditionalResolverCommonJoinCtx context = - (ConditionalResolverCommonJoinCtx) conditionalTask.getResolverCtx(); - HashMap> aliasToWork = - context.getAliasToTask(); - - //to avoid concurrent modify the hashmap - HashMap> newAliasToWork = - new HashMap>(); - - //reset the resolver - for(Map.Entry> entry: aliasToWork.entrySet()){ + } else if (resolver instanceof ConditionalResolverCommonJoin) { + // get bigKeysDirToTaskMap + ConditionalResolverCommonJoinCtx context = (ConditionalResolverCommonJoinCtx) conditionalTask + .getResolverCtx(); + HashMap> aliasToWork = context.getAliasToTask(); + // to avoid concurrent modify the hashmap + HashMap> newAliasToWork = new HashMap>(); + // reset the resolver + for (Map.Entry> entry : aliasToWork.entrySet()) { Task task = entry.getValue(); String key = entry.getKey(); - if(task.equals(currTask)){ + if (task.equals(currTask)) { newAliasToWork.put(key, localTask); - }else{ + } else { newAliasToWork.put(key, task); } } - context.setAliasToTask(newAliasToWork); conditionalTask.setResolverCtx(context); - - }else{ - } - - } } - - //make current task depends on this new generated localMapJoinTask - //now localTask is the parent task of the current task + // make current task depends on this new generated localMapJoinTask + // now localTask is the parent task of the current task localTask.addDependentTask(currTask); - } - } @Override public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { Task currTask = (Task) nd; - //not map reduce task or not conditional task, just skip - if(currTask.isMapRedTask() ){ - if(currTask instanceof ConditionalTask){ - //get the list of task - List> taskList = ((ConditionalTask) currTask).getListTasks(); - for(Task tsk : taskList){ - if(tsk.isMapRedTask()){ - this.processCurrentTask(tsk,((ConditionalTask) currTask)); + // not map reduce task or not conditional task, just skip + if (currTask.isMapRedTask()) { + if (currTask instanceof ConditionalTask) { + // get the list of task + List> taskList = ((ConditionalTask) currTask).getListTasks(); + for (Task tsk : taskList) { + if (tsk.isMapRedTask()) { + this.processCurrentTask(tsk, ((ConditionalTask) currTask)); } } - }else{ - this.processCurrentTask(currTask,null); + } else { + this.processCurrentTask(currTask, null); } } return null; } - //replace the map join operator to local_map_join operator in the operator tree - private List> adjustLocalTask(MapredLocalTask task) throws SemanticException { - - LocalMapJoinProcCtx localMapJoinProcCtx = new LocalMapJoinProcCtx(task, - physicalContext.getParseContext()); - + // replace the map join operator to local_map_join operator in the operator tree + private LocalMapJoinProcCtx adjustLocalTask(MapredLocalTask task) + throws SemanticException { + LocalMapJoinProcCtx localMapJoinProcCtx = new LocalMapJoinProcCtx(task, physicalContext + .getParseContext()); Map opRules = new LinkedHashMap(); - //opRules.put(new RuleRegExp("R1", "MAPJOIN%.*MAPJOIN%"), - //LocalMapJoinProcFactory.getMapJoinMapJoinProc()); opRules.put(new RuleRegExp("R1", "MAPJOIN%"), LocalMapJoinProcFactory.getJoinProc()); - // The dispatcher fires the processor corresponding to the closest // matching rule and passes the context along - Dispatcher disp = new DefaultRuleDispatcher(LocalMapJoinProcFactory - .getDefaultProc(), opRules, localMapJoinProcCtx); + Dispatcher disp = new DefaultRuleDispatcher(LocalMapJoinProcFactory.getDefaultProc(), + opRules, localMapJoinProcCtx); GraphWalker ogw = new DefaultGraphWalker(disp); - // iterator the reducer operator tree ArrayList topNodes = new ArrayList(); - topNodes.addAll(task.getWork().getAliasToWork().values()); ogw.startWalking(topNodes, null); - - return localMapJoinProcCtx.getDummyParentOp(); - + return localMapJoinProcCtx; } public PhysicalContext getPhysicalContext() { @@ -290,6 +256,7 @@ this.physicalContext = physicalContext; } } + /** * A container of current task and parse context. */ @@ -297,12 +264,13 @@ private Task currentTask; private ParseContext parseCtx; private List> dummyParentOp = null; + private boolean isFollowedByGroupBy; - public LocalMapJoinProcCtx(Task task, - ParseContext parseCtx) { + public LocalMapJoinProcCtx(Task task, ParseContext parseCtx) { currentTask = task; this.parseCtx = parseCtx; dummyParentOp = new ArrayList>(); + isFollowedByGroupBy = false; } public Task getCurrentTask() { @@ -313,6 +281,13 @@ this.currentTask = currentTask; } + public boolean isFollowedByGroupBy() { + return isFollowedByGroupBy; + } + + public void setFollowedByGroupBy(boolean isFollowedByGroupBy) { + this.isFollowedByGroupBy = isFollowedByGroupBy; + } public ParseContext getParseCtx() { return parseCtx; } @@ -321,17 +296,16 @@ this.parseCtx = parseCtx; } - public void setDummyParentOp(List> op){ - this.dummyParentOp=op; + public void setDummyParentOp(List> op) { + this.dummyParentOp = op; } - public List> getDummyParentOp(){ + public List> getDummyParentOp() { return this.dummyParentOp; } - public void addDummyParentOp(Operator op){ - this.dummyParentOp.add(op); - } + public void addDummyParentOp(Operator op) { + this.dummyParentOp.add(op); + } } } - Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java (working copy) @@ -20,9 +20,13 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Stack; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -30,9 +34,15 @@ import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.optimizer.physical.MapJoinResolver.LocalMapJoinProcCtx; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc; @@ -44,129 +54,150 @@ * Node processor factory for skew join resolver. */ public final class LocalMapJoinProcFactory { - - - public static NodeProcessor getJoinProc() { return new LocalMapJoinProcessor(); } - public static NodeProcessor getMapJoinMapJoinProc() { - return new MapJoinMapJoinProc(); + + public static NodeProcessor getGroupByProc() { + return new MapJoinFollowByProcessor(); } + public static NodeProcessor getDefaultProc() { return new NodeProcessor() { @Override - public Object process(Node nd, Stack stack, - NodeProcessorCtx procCtx, Object... nodeOutputs) - throws SemanticException { + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { return null; } }; } /** + * MapJoinFollowByProcessor. + * + */ + public static class MapJoinFollowByProcessor implements NodeProcessor { + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) + throws SemanticException { + LocalMapJoinProcCtx context = (LocalMapJoinProcCtx) ctx; + if (!nd.getName().equals("GBY")) { + return null; + } + context.setFollowedByGroupBy(true); + GroupByOperator groupByOp = (GroupByOperator) nd; + float groupByMemoryUsage = context.getParseCtx().getConf().getFloatVar( + HiveConf.ConfVars.HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY); + groupByOp.setGroupByMemoryUsage(groupByMemoryUsage); + return null; + } + } + + /** * LocalMapJoinProcessor. * */ public static class LocalMapJoinProcessor implements NodeProcessor { - public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, - Object... nodeOutputs) throws SemanticException { + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) + throws SemanticException { LocalMapJoinProcCtx context = (LocalMapJoinProcCtx) ctx; - - if(!nd.getName().equals("MAPJOIN")){ + if (!nd.getName().equals("MAPJOIN")) { return null; } MapJoinOperator mapJoinOp = (MapJoinOperator) nd; + try { + hasGroupBy(mapJoinOp, context); + } catch (Exception e) { + e.printStackTrace(); + } HashTableSinkDesc hashTableSinkDesc = new HashTableSinkDesc(mapJoinOp.getConf()); - HashTableSinkOperator hashTableSinkOp =(HashTableSinkOperator)OperatorFactory.get(hashTableSinkDesc); + HashTableSinkOperator hashTableSinkOp = (HashTableSinkOperator) OperatorFactory + .get(hashTableSinkDesc); - //get the last operator for processing big tables + // set hashtable memory usage + float hashtableMemoryUsage; + if (context.isFollowedByGroupBy()) { + hashtableMemoryUsage = context.getParseCtx().getConf().getFloatVar( + HiveConf.ConfVars.HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE); + } else { + hashtableMemoryUsage = context.getParseCtx().getConf().getFloatVar( + HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE); + } + hashTableSinkOp.setHashtableMemoryUsage(hashtableMemoryUsage); + + // get the last operator for processing big tables int bigTable = mapJoinOp.getConf().getPosBigTable(); Byte[] order = mapJoinOp.getConf().getTagOrder(); - int bigTableAlias=(int)order[bigTable]; - + int bigTableAlias = (int) order[bigTable]; Operator bigOp = mapJoinOp.getParentOperators().get(bigTable); - //the parent ops for hashTableSinkOp - List> smallTablesParentOp= new ArrayList>(); - - List> dummyOperators= new ArrayList>(); - //get all parents - List > parentsOp = mapJoinOp.getParentOperators(); - for(int i = 0; i> smallTablesParentOp = new ArrayList>(); + List> dummyOperators = new ArrayList>(); + // get all parents + List> parentsOp = mapJoinOp.getParentOperators(); + for (int i = 0; i < parentsOp.size(); i++) { + if (i == bigTableAlias) { smallTablesParentOp.add(null); continue; } - Operator parent = parentsOp.get(i); - //let hashtable Op be the child of this parent + // let hashtable Op be the child of this parent parent.replaceChild(mapJoinOp, hashTableSinkOp); - //keep the parent id correct + // keep the parent id correct smallTablesParentOp.add(parent); - //create an new operator: HashTable DummyOpeator, which share the table desc + // create an new operator: HashTable DummyOpeator, which share the table desc HashTableDummyDesc desc = new HashTableDummyDesc(); - HashTableDummyOperator dummyOp =(HashTableDummyOperator)OperatorFactory.get(desc); + HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc); TableDesc tbl; - if(parent.getSchema()==null){ - if(parent instanceof TableScanOperator ){ - tbl = ((TableScanOperator)parent).getTableDesc(); - }else{ - throw new SemanticException(); - } - }else{ - //get parent schema + if (parent.getSchema() == null) { + if (parent instanceof TableScanOperator) { + tbl = ((TableScanOperator) parent).getTableDesc(); + } else { + throw new SemanticException(); + } + } else { + // get parent schema RowSchema rowSchema = parent.getSchema(); - tbl = PlanUtils.getIntermediateFileTableDesc(PlanUtils - .getFieldSchemasFromRowSchema(rowSchema, "")); + tbl = PlanUtils.getIntermediateFileTableDesc(PlanUtils.getFieldSchemasFromRowSchema( + rowSchema, "")); } - - dummyOp.getConf().setTbl(tbl); - - //let the dummy op be the parent of mapjoin op + // let the dummy op be the parent of mapjoin op mapJoinOp.replaceParent(parent, dummyOp); List> dummyChildren = new ArrayList>(); dummyChildren.add(mapJoinOp); dummyOp.setChildOperators(dummyChildren); - - //add this dummy op to the dummp operator list + // add this dummy op to the dummp operator list dummyOperators.add(dummyOp); - } - hashTableSinkOp.setParentOperators(smallTablesParentOp); - for(Operator op: dummyOperators){ + for (Operator op : dummyOperators) { context.addDummyParentOp(op); } return null; } - } - - /** - * LocalMapJoinProcessor. - * - */ - public static class MapJoinMapJoinProc implements NodeProcessor { - public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, - Object... nodeOutputs) throws SemanticException { - LocalMapJoinProcCtx context = (LocalMapJoinProcCtx) ctx; - if(!nd.getName().equals("MAPJOIN")){ - return null; - } - System.out.println("Mapjoin * MapJoin"); - - return null; + public void hasGroupBy(Operator mapJoinOp, + LocalMapJoinProcCtx localMapJoinProcCtx) throws Exception { + List> childOps = mapJoinOp.getChildOperators(); + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", "GBY%"), LocalMapJoinProcFactory.getGroupByProc()); + // The dispatcher fires the processor corresponding to the closest + // matching rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(LocalMapJoinProcFactory.getDefaultProc(), + opRules, localMapJoinProcCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + // iterator the reducer operator tree + ArrayList topNodes = new ArrayList(); + topNodes.addAll(childOps); + ogw.startWalking(topNodes, null); } } - private LocalMapJoinProcFactory() { // prevent instantiation } - } - +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (working copy) @@ -92,7 +92,6 @@ JoinDesc joinDesc = joinOp.getConf(); Byte[] order = joinDesc.getTagOrder(); int numAliases = order.length; - try { HashSet smallTableOnlySet = MapJoinProcessor.getSmallTableOnlySet(joinDesc .getConds()); @@ -115,7 +114,6 @@ if (smallTableOnlySet.contains(i)) { continue; } - // create map join task and set big table as i // deep copy a new mapred work from xml InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8")); @@ -148,9 +146,7 @@ aliasToPath.put(bigTableAlias, path); } } - } - } catch (Exception e) { e.printStackTrace(); throw new SemanticException("Generate Map Join Task Error: " + e.getMessage()); @@ -212,7 +208,6 @@ } } - @Override public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { @@ -243,19 +238,16 @@ return null; } - private JoinOperator getJoinOp(MapRedTask task) throws SemanticException { if (task.getWork() == null) { return null; } - Operator reducerOp = task.getWork().getReducer(); if (reducerOp instanceof JoinOperator) { return (JoinOperator) reducerOp; } else { return null; } - } } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (working copy) @@ -117,8 +117,8 @@ protected transient LogHelper console; private long hashTableScale; private boolean isAbort = false; + private float hashtableMemoryUsage = 0l; - public static class HashTableSinkObjectCtx { ObjectInspector standardOI; SerDe serde; @@ -177,7 +177,14 @@ this.conf = new HashTableSinkDesc(mjop.getConf()); } + public float getHashtableMemoryUsage() { + return hashtableMemoryUsage; + } + public void setHashtableMemoryUsage(float hashtableMemoryUsage) { + this.hashtableMemoryUsage = hashtableMemoryUsage; + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT); @@ -244,14 +251,13 @@ for (int pos = 0; pos < numAliases; pos++) { metadataValueTag[pos] = -1; } - mapJoinTables = new HashMap>(); int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD); float hashTableLoadFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR); - float hashTableMaxMemoryUsage = HiveConf.getFloatVar(hconf, - HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE); + float hashTableMaxMemoryUsage = this.hashtableMemoryUsage; + hashTableScale = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEHASHTABLESCALE); if (hashTableScale <= 0) { hashTableScale = 1; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (working copy) @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.exec; import java.io.Serializable; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; @@ -133,6 +135,10 @@ // new Key ObjectInspectors are objectInspectors from the parent transient StructObjectInspector newKeyObjectInspector; transient StructObjectInspector currentKeyObjectInspector; + private float groupByMemoryUsage = 0l; + public static MemoryMXBean memoryMXBean; + private long maxMemory; + private float memoryThreshold; /** * This is used to store the position and field names for variable length @@ -373,6 +379,9 @@ if (hashAggr) { computeMaxEntriesHashAggr(hconf); } + memoryMXBean = ManagementFactory.getMemoryMXBean(); + maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + memoryThreshold = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); initializeChildren(hconf); } @@ -386,8 +395,12 @@ * aggregation only **/ private void computeMaxEntriesHashAggr(Configuration hconf) throws HiveException { - maxHashTblMemory = (long) (HiveConf.getFloatVar(hconf, - HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY) * Runtime.getRuntime().maxMemory()); + float memoryPercentage = this.groupByMemoryUsage; + if(memoryPercentage == 0){ + memoryPercentage = HiveConf.getFloatVar(hconf, + HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); + } + maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory()); estimateRowSize(); } @@ -678,6 +691,14 @@ } } + public float getGroupByMemoryUsage() { + return groupByMemoryUsage; + } + + public void setGroupByMemoryUsage(float groupByMemoryUsage) { + this.groupByMemoryUsage = groupByMemoryUsage; + } + @Override public void processOp(Object row, int tag) throws HiveException { firstRow = false; @@ -824,10 +845,18 @@ **/ private boolean shouldBeFlushed(KeyWrapper newKeys) { int numEntries = hashAggregations.size(); + long usedMemory; + float rate; // The fixed size for the aggregation class is already known. Get the // variable portion of the size every NUMROWSESTIMATESIZE rows. if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) { + //check how much memory left memory + usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); + rate = (float) usedMemory / (float) maxMemory; + if(rate > memoryThreshold){ + return true; + } for (Integer pos : keyPositionsSize) { Object key = newKeys.getKeyArray()[pos.intValue()]; // Ignore nulls Index: ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (working copy) @@ -161,7 +161,6 @@ int size = mHash.size(); long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); double rate = (double) usedMemory / (double) maxMemory; - long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); console.printInfo(Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t" + size + "\tMemory usage:\t" + usedMemory + "\trate:\t" + num.format(rate)); if (rate > (double) maxMemoryUsage) { Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (working copy) @@ -45,7 +45,6 @@ private String tmpFileURI; private String stageID; - private List> dummyParentOp ; public MapredLocalWork() { @@ -90,7 +89,6 @@ this.stageID = stageID; } - public void setAliasToWork( final LinkedHashMap> aliasToWork) { this.aliasToWork = aliasToWork; Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -75,7 +75,6 @@ private QBJoinTree joinTree; - public MapredWork() { aliasToPartnInfo = new LinkedHashMap(); }