Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1042768) +++ conf/hive-default.xml (working copy) @@ -253,6 +253,18 @@ + hive.mapjoin.followby.map.aggr.hash.percentmemory + 0.3 + Portion of total memory to be used by map-side grup aggregation hash table, when this group by is followed by map join + + + + hive.map.aggr.hash.force.flush.memory.threshold + 0.9 + The max memory to be used by map-side grup aggregation hash table, if the memory usage is higher than this number, force to flush data + + + hive.map.aggr.hash.percentmemory 0.5 Portion of total memory to be used by map-side grup aggregation hash table @@ -505,6 +517,12 @@ + hive.mapjoin.followby.gby.localtask.max.memory.usage + 0.55 + This number means how much memory the local task can take to hold the key/value into in-memory hash table when this map join followed by a group by; If the local task's memory usage is more than this number, the local task will be abort by themself. It means the data of small table is too large to be hold in the memory. + + + hive.mapjoin.check.memory.rows 100000 The number means after how many rows processed it needs to check the memory usage @@ -649,7 +667,6 @@ Maximum number of HDFS files created by all mappers/reducers in a MapReduce job. - hive.exec.default.partition.name __HIVE_DEFAULT_PARTITION__ Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1042768) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -202,6 +202,8 @@ HIVEMAPJOINCACHEROWS("hive.mapjoin.cache.numrows", 25000), HIVEGROUPBYMAPINTERVAL("hive.groupby.mapaggr.checkinterval", 100000), HIVEMAPAGGRHASHMEMORY("hive.map.aggr.hash.percentmemory", (float) 0.5), + HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY("hive.mapjoin.followby.map.aggr.hash.percentmemory", (float) 0.3), + HIVEMAPAGGRMEMORYTHRESHOLD("hive.map.aggr.hash.force.flush.memory.threshold", (float) 0.9), HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.5), // for hive udtf operator @@ -256,6 +258,7 @@ HIVEMAXMAPJOINSIZE("hive.mapjoin.maxsize", 100000), HIVEHASHTABLETHRESHOLD("hive.hashtable.initialCapacity", 100000), HIVEHASHTABLELOADFACTOR("hive.hashtable.loadfactor", (float) 0.75), + HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE("hive.mapjoin.followby.gby.localtask.max.memory.usage", (float) 0.55), HIVEHASHTABLEMAXMEMORYUSAGE("hive.mapjoin.localtask.max.memory.usage", (float) 0.90), HIVEHASHTABLESCALE("hive.mapjoin.check.memory.rows", (long)100000), @@ -315,7 +318,7 @@ HIVEFETCHOUTPUTSERDE("hive.fetch.output.serde", "org.apache.hadoop.hive.serde2.DelimitedJSONSerDe"), SEMANTIC_ANALYZER_HOOK("hive.semantic.analyzer.hook",null), - + // Print column names in output HIVE_CLI_PRINT_HEADER("hive.cli.print.header", false); ; Index: ql/src/test/results/clientpositive/auto_join26.q.out =================================================================== --- ql/src/test/results/clientpositive/auto_join26.q.out (revision 0) +++ ql/src/test/results/clientpositive/auto_join26.q.out (revision 0) @@ -0,0 +1,315 @@ +PREHOOK: query: CREATE TABLE dest_j1(key INT, cnt INT) +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE dest_j1(key INT, cnt INT) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@dest_j1 +PREHOOK: query: EXPLAIN +INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF src1 x) (TOK_TABREF src y) (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL y) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB dest_j1)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL x) key)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL x) key)))) + +STAGE DEPENDENCIES: + Stage-7 is a root stage , consists of Stage-8, Stage-9, Stage-1 + Stage-8 has a backup stage: Stage-1 + Stage-5 depends on stages: Stage-8 + Stage-2 depends on stages: Stage-1, Stage-5, Stage-6 + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + Stage-9 has a backup stage: Stage-1 + Stage-6 depends on stages: Stage-9 + Stage-1 + +STAGE PLANS: + Stage: Stage-7 + Conditional Operator + + Stage: Stage-8 + Map Reduce Local Work + Alias -> Map Local Tables: + y + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + y + TableScan + alias: y + HashTable Sink Operator + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 0 + + Stage: Stage-5 + Map Reduce + Alias -> Map Operator Tree: + x + TableScan + alias: x + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0 + Position of Big Table: 0 + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: _col0 + type: string + mode: hash + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + Local Work: + Map Reduce Local Work + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + file:/tmp/liyintang/hive_2010-12-06_15-13-36_873_2449071063337197304/-mr-10002 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col1 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: bigint + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: UDFToInteger(_col0) + type: int + expr: UDFToInteger(_col1) + type: int + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: dest_j1 + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: dest_j1 + + Stage: Stage-3 + Stats-Aggr Operator + + Stage: Stage-9 + Map Reduce Local Work + Alias -> Map Local Tables: + x + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + x + TableScan + alias: x + HashTable Sink Operator + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 1 + + Stage: Stage-6 + Map Reduce + Alias -> Map Operator Tree: + y + TableScan + alias: y + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {key} + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + outputColumnNames: _col0 + Position of Big Table: 1 + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: _col0 + type: string + mode: hash + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + Local Work: + Map Reduce Local Work + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + x + TableScan + alias: x + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 0 + value expressions: + expr: key + type: string + y + TableScan + alias: y + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} + 1 + handleSkewJoin: false + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: string + outputColumnNames: _col0 + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: _col0 + type: string + mode: hash + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + + +PREHOOK: query: INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src1 +PREHOOK: Output: default@dest_j1 +POSTHOOK: query: INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src1 +POSTHOOK: Output: default@dest_j1 +POSTHOOK: Lineage: dest_j1.cnt EXPRESSION [(src1)x.null, (src)y.null, ] +POSTHOOK: Lineage: dest_j1.key EXPRESSION [(src1)x.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: select * from dest_j1 x order by x.key +PREHOOK: type: QUERY +PREHOOK: Input: default@dest_j1 +PREHOOK: Output: file:/tmp/liyintang/hive_2010-12-06_15-13-51_429_1293784409064103219/-mr-10000 +POSTHOOK: query: select * from dest_j1 x order by x.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest_j1 +POSTHOOK: Output: file:/tmp/liyintang/hive_2010-12-06_15-13-51_429_1293784409064103219/-mr-10000 +POSTHOOK: Lineage: dest_j1.cnt EXPRESSION [(src1)x.null, (src)y.null, ] +POSTHOOK: Lineage: dest_j1.key EXPRESSION [(src1)x.FieldSchema(name:key, type:string, comment:default), ] +66 1 +98 2 +128 3 +146 2 +150 1 +213 2 +224 2 +238 2 +255 2 +273 3 +278 2 +311 3 +369 3 +401 5 +406 4 Index: ql/src/test/queries/clientpositive/auto_join26.q =================================================================== --- ql/src/test/queries/clientpositive/auto_join26.q (revision 0) +++ ql/src/test/queries/clientpositive/auto_join26.q (revision 0) @@ -0,0 +1,10 @@ +CREATE TABLE dest_j1(key INT, cnt INT); +set hive.auto.convert.join = true; +EXPLAIN +INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key; + +INSERT OVERWRITE TABLE dest_j1 +SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key; + +select * from dest_j1 x order by x.key; Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (working copy) @@ -55,31 +55,32 @@ import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx; /** - * An implementation of PhysicalPlanResolver. It iterator each MapRedTask to see whether the task has a local map work - * if it has, it will move the local work to a new local map join task. Then it will make this new generated task depends on - * current task's parent task and make current task depends on this new generated task. + * An implementation of PhysicalPlanResolver. It iterator each MapRedTask to see whether the task + * has a local map work if it has, it will move the local work to a new local map join task. Then it + * will make this new generated task depends on current task's parent task and make current task + * depends on this new generated task. */ public class MapJoinResolver implements PhysicalPlanResolver { @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { - //create dispatcher and graph walker + // create dispatcher and graph walker Dispatcher disp = new LocalMapJoinTaskDispatcher(pctx); TaskGraphWalker ogw = new TaskGraphWalker(disp); - //get all the tasks nodes from root task + // get all the tasks nodes from root task ArrayList topNodes = new ArrayList(); topNodes.addAll(pctx.rootTasks); - //begin to walk through the task tree. + // begin to walk through the task tree. ogw.startWalking(topNodes, null); return pctx; } /** - * Iterator each tasks. If this task has a local work,create a new task for this local work, named MapredLocalTask. - * then make this new generated task depends on current task's parent task, and make current task - * depends on this new generated task + * Iterator each tasks. If this task has a local work,create a new task for this local work, named + * MapredLocalTask. then make this new generated task depends on current task's parent task, and + * make current task depends on this new generated task */ class LocalMapJoinTaskDispatcher implements Dispatcher { @@ -91,195 +92,160 @@ } private void processCurrentTask(Task currTask, - ConditionalTask conditionalTask) throws SemanticException{ - - - //get current mapred work and its local work + ConditionalTask conditionalTask) throws SemanticException { + // get current mapred work and its local work MapredWork mapredWork = (MapredWork) currTask.getWork(); MapredLocalWork localwork = mapredWork.getMapLocalWork(); - - - if(localwork != null){ - //get the context info and set up the shared tmp URI + if (localwork != null) { + // get the context info and set up the shared tmp URI Context ctx = physicalContext.getContext(); String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId()); localwork.setTmpFileURI(tmpFileURI); String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId()); mapredWork.setTmpHDFSFileURI(hdfsTmpURI); - //create a task for this local work; right now, this local work is shared - //by the original MapredTask and this new generated MapredLocalTask. - MapredLocalTask localTask = (MapredLocalTask) TaskFactory.get(localwork, - physicalContext.getParseContext().getConf()); + // create a task for this local work; right now, this local work is shared + // by the original MapredTask and this new generated MapredLocalTask. + MapredLocalTask localTask = (MapredLocalTask) TaskFactory.get(localwork, physicalContext + .getParseContext().getConf()); - //set the backup task from curr task + // set the backup task from curr task localTask.setBackupTask(currTask.getBackupTask()); localTask.setBackupChildrenTasks(currTask.getBackupChildrenTasks()); currTask.setBackupChildrenTasks(null); currTask.setBackupTask(null); - if(currTask.getTaskTag() == Task.CONVERTED_MAPJOIN) { + if (currTask.getTaskTag() == Task.CONVERTED_MAPJOIN) { localTask.setTaskTag(Task.CONVERTED_LOCAL_MAPJOIN); } else { localTask.setTaskTag(Task.LOCAL_MAPJOIN); } + // replace the map join operator to local_map_join operator in the operator tree + // and return all the dummy parent + LocalMapJoinProcCtx localMapJoinProcCtx= adjustLocalTask(localTask); + List> dummyOps = localMapJoinProcCtx.getDummyParentOp(); - //replace the map join operator to local_map_join operator in the operator tree - //and return all the dummy parent - List> dummyOps= adjustLocalTask(localTask); - - //create new local work and setup the dummy ops + // create new local work and setup the dummy ops MapredLocalWork newLocalWork = new MapredLocalWork(); newLocalWork.setDummyParentOp(dummyOps); newLocalWork.setTmpFileURI(tmpFileURI); newLocalWork.setInputFileChangeSensitive(localwork.getInputFileChangeSensitive()); mapredWork.setMapLocalWork(newLocalWork); - - //get all parent tasks + // get all parent tasks List> parentTasks = currTask.getParentTasks(); currTask.setParentTasks(null); if (parentTasks != null) { - for (Task tsk : parentTasks) { - //make new generated task depends on all the parent tasks of current task. + // make new generated task depends on all the parent tasks of current task. tsk.addDependentTask(localTask); - //remove the current task from its original parent task's dependent task + // remove the current task from its original parent task's dependent task tsk.removeDependentTask(currTask); } - - }else{ - //in this case, current task is in the root tasks - //so add this new task into root tasks and remove the current task from root tasks - if(conditionalTask== null){ + } else { + // in this case, current task is in the root tasks + // so add this new task into root tasks and remove the current task from root tasks + if (conditionalTask == null) { physicalContext.addToRootTask(localTask); physicalContext.removeFromRootTask(currTask); - }else{ - //set list task + } else { + // set list task List> listTask = conditionalTask.getListTasks(); - ConditionalWork conditionalWork= conditionalTask.getWork(); + ConditionalWork conditionalWork = conditionalTask.getWork(); int index = listTask.indexOf(currTask); listTask.set(index, localTask); - - //set list work - List listWork = (List)conditionalWork.getListWorks(); + // set list work + List listWork = (List) conditionalWork.getListWorks(); index = listWork.indexOf(mapredWork); - listWork.set(index,(Serializable)localwork); + listWork.set(index, (Serializable) localwork); conditionalWork.setListWorks(listWork); - ConditionalResolver resolver = conditionalTask.getResolver(); - if(resolver instanceof ConditionalResolverSkewJoin){ - //get bigKeysDirToTaskMap - ConditionalResolverSkewJoinCtx context = - (ConditionalResolverSkewJoinCtx) conditionalTask.getResolverCtx(); - HashMap> bigKeysDirToTaskMap = - context.getDirToTaskMap(); - - //to avoid concurrent modify the hashmap - HashMap> newbigKeysDirToTaskMap = - new HashMap>(); - - - //reset the resolver - for(Map.Entry> entry: bigKeysDirToTaskMap.entrySet()){ + if (resolver instanceof ConditionalResolverSkewJoin) { + // get bigKeysDirToTaskMap + ConditionalResolverSkewJoinCtx context = (ConditionalResolverSkewJoinCtx) conditionalTask + .getResolverCtx(); + HashMap> bigKeysDirToTaskMap = context + .getDirToTaskMap(); + // to avoid concurrent modify the hashmap + HashMap> newbigKeysDirToTaskMap = new HashMap>(); + // reset the resolver + for (Map.Entry> entry : bigKeysDirToTaskMap + .entrySet()) { Task task = entry.getValue(); String key = entry.getKey(); - if(task.equals(currTask)){ + if (task.equals(currTask)) { newbigKeysDirToTaskMap.put(key, localTask); - }else{ + } else { newbigKeysDirToTaskMap.put(key, task); } } - context.setDirToTaskMap(newbigKeysDirToTaskMap); conditionalTask.setResolverCtx(context); - - }else if(resolver instanceof ConditionalResolverCommonJoin){ - //get bigKeysDirToTaskMap - ConditionalResolverCommonJoinCtx context = - (ConditionalResolverCommonJoinCtx) conditionalTask.getResolverCtx(); - HashMap> aliasToWork = - context.getAliasToTask(); - - //to avoid concurrent modify the hashmap - HashMap> newAliasToWork = - new HashMap>(); - - //reset the resolver - for(Map.Entry> entry: aliasToWork.entrySet()){ + } else if (resolver instanceof ConditionalResolverCommonJoin) { + // get bigKeysDirToTaskMap + ConditionalResolverCommonJoinCtx context = (ConditionalResolverCommonJoinCtx) conditionalTask + .getResolverCtx(); + HashMap> aliasToWork = context.getAliasToTask(); + // to avoid concurrent modify the hashmap + HashMap> newAliasToWork = new HashMap>(); + // reset the resolver + for (Map.Entry> entry : aliasToWork.entrySet()) { Task task = entry.getValue(); String key = entry.getKey(); - if(task.equals(currTask)){ + if (task.equals(currTask)) { newAliasToWork.put(key, localTask); - }else{ + } else { newAliasToWork.put(key, task); } } - context.setAliasToTask(newAliasToWork); conditionalTask.setResolverCtx(context); - - }else{ - } - - } } - - //make current task depends on this new generated localMapJoinTask - //now localTask is the parent task of the current task + // make current task depends on this new generated localMapJoinTask + // now localTask is the parent task of the current task localTask.addDependentTask(currTask); - } - } @Override public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { Task currTask = (Task) nd; - //not map reduce task or not conditional task, just skip - if(currTask.isMapRedTask() ){ - if(currTask instanceof ConditionalTask){ - //get the list of task - List> taskList = ((ConditionalTask) currTask).getListTasks(); - for(Task tsk : taskList){ - if(tsk.isMapRedTask()){ - this.processCurrentTask(tsk,((ConditionalTask) currTask)); + // not map reduce task or not conditional task, just skip + if (currTask.isMapRedTask()) { + if (currTask instanceof ConditionalTask) { + // get the list of task + List> taskList = ((ConditionalTask) currTask).getListTasks(); + for (Task tsk : taskList) { + if (tsk.isMapRedTask()) { + this.processCurrentTask(tsk, ((ConditionalTask) currTask)); } } - }else{ - this.processCurrentTask(currTask,null); + } else { + this.processCurrentTask(currTask, null); } } return null; } - //replace the map join operator to local_map_join operator in the operator tree - private List> adjustLocalTask(MapredLocalTask task) throws SemanticException { - - LocalMapJoinProcCtx localMapJoinProcCtx = new LocalMapJoinProcCtx(task, - physicalContext.getParseContext()); - + // replace the map join operator to local_map_join operator in the operator tree + private LocalMapJoinProcCtx adjustLocalTask(MapredLocalTask task) + throws SemanticException { + LocalMapJoinProcCtx localMapJoinProcCtx = new LocalMapJoinProcCtx(task, physicalContext + .getParseContext()); Map opRules = new LinkedHashMap(); - //opRules.put(new RuleRegExp("R1", "MAPJOIN%.*MAPJOIN%"), - //LocalMapJoinProcFactory.getMapJoinMapJoinProc()); opRules.put(new RuleRegExp("R1", "MAPJOIN%"), LocalMapJoinProcFactory.getJoinProc()); - // The dispatcher fires the processor corresponding to the closest // matching rule and passes the context along - Dispatcher disp = new DefaultRuleDispatcher(LocalMapJoinProcFactory - .getDefaultProc(), opRules, localMapJoinProcCtx); + Dispatcher disp = new DefaultRuleDispatcher(LocalMapJoinProcFactory.getDefaultProc(), + opRules, localMapJoinProcCtx); GraphWalker ogw = new DefaultGraphWalker(disp); - // iterator the reducer operator tree ArrayList topNodes = new ArrayList(); - topNodes.addAll(task.getWork().getAliasToWork().values()); ogw.startWalking(topNodes, null); - - return localMapJoinProcCtx.getDummyParentOp(); - + return localMapJoinProcCtx; } public PhysicalContext getPhysicalContext() { @@ -290,6 +256,7 @@ this.physicalContext = physicalContext; } } + /** * A container of current task and parse context. */ @@ -297,12 +264,13 @@ private Task currentTask; private ParseContext parseCtx; private List> dummyParentOp = null; + private boolean isFollowedByGroupBy; - public LocalMapJoinProcCtx(Task task, - ParseContext parseCtx) { + public LocalMapJoinProcCtx(Task task, ParseContext parseCtx) { currentTask = task; this.parseCtx = parseCtx; dummyParentOp = new ArrayList>(); + isFollowedByGroupBy = false; } public Task getCurrentTask() { @@ -313,6 +281,13 @@ this.currentTask = currentTask; } + public boolean isFollowedByGroupBy() { + return isFollowedByGroupBy; + } + + public void setFollowedByGroupBy(boolean isFollowedByGroupBy) { + this.isFollowedByGroupBy = isFollowedByGroupBy; + } public ParseContext getParseCtx() { return parseCtx; } @@ -321,17 +296,16 @@ this.parseCtx = parseCtx; } - public void setDummyParentOp(List> op){ - this.dummyParentOp=op; + public void setDummyParentOp(List> op) { + this.dummyParentOp = op; } - public List> getDummyParentOp(){ + public List> getDummyParentOp() { return this.dummyParentOp; } - public void addDummyParentOp(Operator op){ - this.dummyParentOp.add(op); - } + public void addDummyParentOp(Operator op) { + this.dummyParentOp.add(op); + } } } - Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java (working copy) @@ -20,9 +20,13 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Stack; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; @@ -30,9 +34,15 @@ import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.optimizer.physical.MapJoinResolver.LocalMapJoinProcCtx; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc; @@ -44,129 +54,154 @@ * Node processor factory for skew join resolver. */ public final class LocalMapJoinProcFactory { - - - public static NodeProcessor getJoinProc() { return new LocalMapJoinProcessor(); } - public static NodeProcessor getMapJoinMapJoinProc() { - return new MapJoinMapJoinProc(); + + public static NodeProcessor getGroupByProc() { + return new MapJoinFollowByProcessor(); } + public static NodeProcessor getDefaultProc() { return new NodeProcessor() { @Override - public Object process(Node nd, Stack stack, - NodeProcessorCtx procCtx, Object... nodeOutputs) - throws SemanticException { + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { return null; } }; } /** + * MapJoinFollowByProcessor. + * + */ + public static class MapJoinFollowByProcessor implements NodeProcessor { + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) + throws SemanticException { + LocalMapJoinProcCtx context = (LocalMapJoinProcCtx) ctx; + if (!nd.getName().equals("GBY")) { + return null; + } + context.setFollowedByGroupBy(true); + GroupByOperator groupByOp = (GroupByOperator) nd; + float groupByMemoryUsage = context.getParseCtx().getConf().getFloatVar( + HiveConf.ConfVars.HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY); + if (groupByOp.getConf() == null) { + System.out.println("Group by desc is null"); + return null; + } + groupByOp.getConf().setGroupByMemoryUsage(groupByMemoryUsage); + return null; + } + } + + /** * LocalMapJoinProcessor. * */ public static class LocalMapJoinProcessor implements NodeProcessor { - public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, - Object... nodeOutputs) throws SemanticException { + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) + throws SemanticException { LocalMapJoinProcCtx context = (LocalMapJoinProcCtx) ctx; - - if(!nd.getName().equals("MAPJOIN")){ + if (!nd.getName().equals("MAPJOIN")) { return null; } MapJoinOperator mapJoinOp = (MapJoinOperator) nd; + try { + hasGroupBy(mapJoinOp, context); + } catch (Exception e) { + e.printStackTrace(); + } HashTableSinkDesc hashTableSinkDesc = new HashTableSinkDesc(mapJoinOp.getConf()); - HashTableSinkOperator hashTableSinkOp =(HashTableSinkOperator)OperatorFactory.get(hashTableSinkDesc); + HashTableSinkOperator hashTableSinkOp = (HashTableSinkOperator) OperatorFactory + .get(hashTableSinkDesc); - //get the last operator for processing big tables + // set hashtable memory usage + float hashtableMemoryUsage; + if (context.isFollowedByGroupBy()) { + hashtableMemoryUsage = context.getParseCtx().getConf().getFloatVar( + HiveConf.ConfVars.HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE); + } else { + hashtableMemoryUsage = context.getParseCtx().getConf().getFloatVar( + HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE); + } + hashTableSinkOp.getConf().setHashtableMemoryUsage(hashtableMemoryUsage); + + // get the last operator for processing big tables int bigTable = mapJoinOp.getConf().getPosBigTable(); Byte[] order = mapJoinOp.getConf().getTagOrder(); - int bigTableAlias=(int)order[bigTable]; - + int bigTableAlias = (int) order[bigTable]; Operator bigOp = mapJoinOp.getParentOperators().get(bigTable); - //the parent ops for hashTableSinkOp - List> smallTablesParentOp= new ArrayList>(); - - List> dummyOperators= new ArrayList>(); - //get all parents - List > parentsOp = mapJoinOp.getParentOperators(); - for(int i = 0; i> smallTablesParentOp = new ArrayList>(); + List> dummyOperators = new ArrayList>(); + // get all parents + List> parentsOp = mapJoinOp.getParentOperators(); + for (int i = 0; i < parentsOp.size(); i++) { + if (i == bigTableAlias) { smallTablesParentOp.add(null); continue; } - Operator parent = parentsOp.get(i); - //let hashtable Op be the child of this parent + // let hashtable Op be the child of this parent parent.replaceChild(mapJoinOp, hashTableSinkOp); - //keep the parent id correct + // keep the parent id correct smallTablesParentOp.add(parent); - //create an new operator: HashTable DummyOpeator, which share the table desc + // create an new operator: HashTable DummyOpeator, which share the table desc HashTableDummyDesc desc = new HashTableDummyDesc(); - HashTableDummyOperator dummyOp =(HashTableDummyOperator)OperatorFactory.get(desc); + HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc); TableDesc tbl; - if(parent.getSchema()==null){ - if(parent instanceof TableScanOperator ){ - tbl = ((TableScanOperator)parent).getTableDesc(); - }else{ - throw new SemanticException(); - } - }else{ - //get parent schema + if (parent.getSchema() == null) { + if (parent instanceof TableScanOperator) { + tbl = ((TableScanOperator) parent).getTableDesc(); + } else { + throw new SemanticException(); + } + } else { + // get parent schema RowSchema rowSchema = parent.getSchema(); - tbl = PlanUtils.getIntermediateFileTableDesc(PlanUtils - .getFieldSchemasFromRowSchema(rowSchema, "")); + tbl = PlanUtils.getIntermediateFileTableDesc(PlanUtils.getFieldSchemasFromRowSchema( + rowSchema, "")); } - - dummyOp.getConf().setTbl(tbl); - - //let the dummy op be the parent of mapjoin op + // let the dummy op be the parent of mapjoin op mapJoinOp.replaceParent(parent, dummyOp); List> dummyChildren = new ArrayList>(); dummyChildren.add(mapJoinOp); dummyOp.setChildOperators(dummyChildren); - - //add this dummy op to the dummp operator list + // add this dummy op to the dummp operator list dummyOperators.add(dummyOp); - } - hashTableSinkOp.setParentOperators(smallTablesParentOp); - for(Operator op: dummyOperators){ + for (Operator op : dummyOperators) { context.addDummyParentOp(op); } return null; } - } - - /** - * LocalMapJoinProcessor. - * - */ - public static class MapJoinMapJoinProc implements NodeProcessor { - public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, - Object... nodeOutputs) throws SemanticException { - LocalMapJoinProcCtx context = (LocalMapJoinProcCtx) ctx; - if(!nd.getName().equals("MAPJOIN")){ - return null; - } - System.out.println("Mapjoin * MapJoin"); - - return null; + public void hasGroupBy(Operator mapJoinOp, + LocalMapJoinProcCtx localMapJoinProcCtx) throws Exception { + List> childOps = mapJoinOp.getChildOperators(); + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", "GBY%"), LocalMapJoinProcFactory.getGroupByProc()); + // The dispatcher fires the processor corresponding to the closest + // matching rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(LocalMapJoinProcFactory.getDefaultProc(), + opRules, localMapJoinProcCtx); + GraphWalker ogw = new DefaultGraphWalker(disp); + // iterator the reducer operator tree + ArrayList topNodes = new ArrayList(); + topNodes.addAll(childOps); + ogw.startWalking(topNodes, null); } } - private LocalMapJoinProcFactory() { // prevent instantiation } - } - +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (working copy) @@ -92,7 +92,6 @@ JoinDesc joinDesc = joinOp.getConf(); Byte[] order = joinDesc.getTagOrder(); int numAliases = order.length; - try { HashSet smallTableOnlySet = MapJoinProcessor.getSmallTableOnlySet(joinDesc .getConds()); @@ -115,7 +114,6 @@ if (smallTableOnlySet.contains(i)) { continue; } - // create map join task and set big table as i // deep copy a new mapred work from xml InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8")); @@ -148,9 +146,7 @@ aliasToPath.put(bigTableAlias, path); } } - } - } catch (Exception e) { e.printStackTrace(); throw new SemanticException("Generate Map Join Task Error: " + e.getMessage()); @@ -212,7 +208,6 @@ } } - @Override public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { @@ -243,19 +238,16 @@ return null; } - private JoinOperator getJoinOp(MapRedTask task) throws SemanticException { if (task.getWork() == null) { return null; } - Operator reducerOp = task.getWork().getReducer(); if (reducerOp instanceof JoinOperator) { return (JoinOperator) reducerOp; } else { return null; } - } } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (working copy) @@ -118,7 +118,6 @@ private long hashTableScale; private boolean isAbort = false; - public static class HashTableSinkObjectCtx { ObjectInspector standardOI; SerDe serde; @@ -244,14 +243,13 @@ for (int pos = 0; pos < numAliases; pos++) { metadataValueTag[pos] = -1; } - mapJoinTables = new HashMap>(); int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD); float hashTableLoadFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR); - float hashTableMaxMemoryUsage = HiveConf.getFloatVar(hconf, - HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE); + float hashTableMaxMemoryUsage = this.getConf().getHashtableMemoryUsage(); + hashTableScale = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEHASHTABLESCALE); if (hashTableScale <= 0) { hashTableScale = 1; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (working copy) @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.exec; import java.io.Serializable; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; @@ -133,6 +135,9 @@ // new Key ObjectInspectors are objectInspectors from the parent transient StructObjectInspector newKeyObjectInspector; transient StructObjectInspector currentKeyObjectInspector; + public static MemoryMXBean memoryMXBean; + private long maxMemory; + private float memoryThreshold; /** * This is used to store the position and field names for variable length @@ -373,6 +378,9 @@ if (hashAggr) { computeMaxEntriesHashAggr(hconf); } + memoryMXBean = ManagementFactory.getMemoryMXBean(); + maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + memoryThreshold = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); initializeChildren(hconf); } @@ -386,8 +394,8 @@ * aggregation only **/ private void computeMaxEntriesHashAggr(Configuration hconf) throws HiveException { - maxHashTblMemory = (long) (HiveConf.getFloatVar(hconf, - HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY) * Runtime.getRuntime().maxMemory()); + float memoryPercentage = this.getConf().getGroupByMemoryUsage(); + maxHashTblMemory = (long) (memoryPercentage * Runtime.getRuntime().maxMemory()); estimateRowSize(); } @@ -824,10 +832,18 @@ **/ private boolean shouldBeFlushed(KeyWrapper newKeys) { int numEntries = hashAggregations.size(); + long usedMemory; + float rate; // The fixed size for the aggregation class is already known. Get the // variable portion of the size every NUMROWSESTIMATESIZE rows. if ((numEntriesHashTable == 0) || ((numEntries % NUMROWSESTIMATESIZE) == 0)) { + //check how much memory left memory + usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); + rate = (float) usedMemory / (float) maxMemory; + if(rate > memoryThreshold){ + return true; + } for (Integer pos : keyPositionsSize) { Object key = newKeys.getKeyArray()[pos.intValue()]; // Ignore nulls Index: ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (working copy) @@ -161,7 +161,6 @@ int size = mHash.size(); long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); double rate = (double) usedMemory / (double) maxMemory; - long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); console.printInfo(Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t" + size + "\tMemory usage:\t" + usedMemory + "\trate:\t" + num.format(rate)); if (rate > (double) maxMemoryUsage) { Index: ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java (working copy) @@ -79,6 +79,7 @@ private LinkedHashMap>> aliasBucketFileNameMapping; private LinkedHashMap bucketFileNameMapping; + private float hashtableMemoryUsage; public HashTableSinkDesc() { bucketFileNameMapping = new LinkedHashMap(); @@ -125,6 +126,14 @@ } } + public float getHashtableMemoryUsage() { + return hashtableMemoryUsage; + } + + public void setHashtableMemoryUsage(float hashtableMemoryUsage) { + this.hashtableMemoryUsage = hashtableMemoryUsage; + } + public boolean isHandleSkewJoin() { return handleSkewJoin; } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (working copy) @@ -45,7 +45,6 @@ private String tmpFileURI; private String stageID; - private List> dummyParentOp ; public MapredLocalWork() { @@ -90,7 +89,6 @@ this.stageID = stageID; } - public void setAliasToWork( final LinkedHashMap> aliasToWork) { this.aliasToWork = aliasToWork; Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -75,7 +75,6 @@ private QBJoinTree joinTree; - public MapredWork() { aliasToPartnInfo = new LinkedHashMap(); } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (working copy) @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan; + /** * GroupByDesc. * @@ -52,6 +53,7 @@ private java.util.ArrayList keys; private java.util.ArrayList aggregators; private java.util.ArrayList outputColumnNames; + private float groupByMemoryUsage; public GroupByDesc() { } @@ -61,9 +63,9 @@ final java.util.ArrayList outputColumnNames, final java.util.ArrayList keys, final java.util.ArrayList aggregators, - final boolean groupKeyNotReductionKey) { + final boolean groupKeyNotReductionKey,float groupByMemoryUsage) { this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey, - false); + false,groupByMemoryUsage); } public GroupByDesc( @@ -71,13 +73,14 @@ final java.util.ArrayList outputColumnNames, final java.util.ArrayList keys, final java.util.ArrayList aggregators, - final boolean groupKeyNotReductionKey, final boolean bucketGroup) { + final boolean groupKeyNotReductionKey, final boolean bucketGroup,float groupByMemoryUsage) { this.mode = mode; this.outputColumnNames = outputColumnNames; this.keys = keys; this.aggregators = aggregators; this.groupKeyNotReductionKey = groupKeyNotReductionKey; this.bucketGroup = bucketGroup; + this.groupByMemoryUsage = groupByMemoryUsage; } public Mode getMode() { @@ -106,6 +109,14 @@ return "unknown"; } + public float getGroupByMemoryUsage() { + return groupByMemoryUsage; + } + + public void setGroupByMemoryUsage(float groupByMemoryUsage) { + this.groupByMemoryUsage = groupByMemoryUsage; + } + public void setMode(final Mode mode) { this.mode = mode; } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1042768) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -82,12 +82,12 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; @@ -2274,10 +2274,10 @@ genericUDAFEvaluators.put(entry.getKey(), genericUDAFEvaluator); } } - + float groupByMemoryUsage = HiveConf.getFloatVar(conf,HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false), new RowSchema(groupByOutputRowResolver.getColumnInfos()), + false,groupByMemoryUsage), new RowSchema(groupByOutputRowResolver.getColumnInfos()), reduceSinkOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; @@ -2422,10 +2422,10 @@ groupByOutputRowResolver.putExpression(value, new ColumnInfo( field, udaf.returnType, "", false)); } - + float groupByMemoryUsage = HiveConf.getFloatVar(conf,HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - distPartAgg), new RowSchema(groupByOutputRowResolver + distPartAgg,groupByMemoryUsage), new RowSchema(groupByOutputRowResolver .getColumnInfos()), reduceSinkOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); @@ -2540,10 +2540,10 @@ genericUDAFEvaluators.put(entry.getKey(), genericUDAFEvaluator); } } - + float groupByMemoryUsage = HiveConf.getFloatVar(conf,HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false), new RowSchema(groupByOutputRowResolver.getColumnInfos()), + false,groupByMemoryUsage), new RowSchema(groupByOutputRowResolver.getColumnInfos()), inputOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap); return op; @@ -2847,10 +2847,10 @@ groupByOutputRowResolver2.putExpression(value, new ColumnInfo( field, udaf.returnType, "", false)); } - + float groupByMemoryUsage = HiveConf.getFloatVar(conf,HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false), new RowSchema(groupByOutputRowResolver2.getColumnInfos()), + false,groupByMemoryUsage), new RowSchema(groupByOutputRowResolver2.getColumnInfos()), reduceSinkOperatorInfo2), groupByOutputRowResolver2); op.setColumnExprMap(colExprMap); return op; @@ -4578,9 +4578,10 @@ } // Generate group-by operator + float groupByMemoryUsage = HiveConf.getFloatVar(conf,HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); Operator op = putOpInsertMap(OperatorFactory.getAndMakeChild( new GroupByDesc(mode, outputColumnNames, groupByKeys, aggregations, - false), new RowSchema(groupByOutputRowResolver.getColumnInfos()), + false,groupByMemoryUsage), new RowSchema(groupByOutputRowResolver.getColumnInfos()), inputOperatorInfo), groupByOutputRowResolver); op.setColumnExprMap(colExprMap);