diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 49706b1..6a970df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -63,7 +63,12 @@ private void initialize(HiveConf hiveConf) { if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER)) { resolvers.add(new IndexWhereResolver()); } - resolvers.add(new MapJoinResolver()); + if (HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + resolvers.add(new SparkMapJoinResolver()); + } else { + resolvers.add(new MapJoinResolver()); + } + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEMETADATAONLYQUERIES)) { resolvers.add(new MetadataOnlyOptimizer()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinProcFactory.java new file mode 100644 index 0000000..71d2d92 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinProcFactory.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; +import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +//import org.apache.hadoop.hive.ql.optimizer.physical.MapJoinResolver.LocalMapJoinProcCtx; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc; +import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; + +/** + * Cloned from LocalMapJoinProcFactory + * Node processor factory for map join resolver. For a map-join, it replaces the + * small tables reduce-sink operator with a hash-table sink operator. + * And if the map join is followed by a group by, the hash-table sink + * operator, should be configured to use less memory to avoid + * OOM in group by operator. + */ +public final class SparkMapJoinProcFactory { + private static final Log LOG = LogFactory.getLog(LocalMapJoinProcFactory.class); + + public static NodeProcessor getJoinProc() { + return new SparkMapJoinProcessor(); + } + + public static NodeProcessor getGroupByProc() { + return new MapJoinFollowedByGroupByProcessor(); + } + + public static NodeProcessor getDefaultProc() { + return new NodeProcessor() { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + return null; + } + }; + } + + /** + * MapJoinFollowByProcessor. + * + */ + public static class MapJoinFollowedByGroupByProcessor implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) + throws SemanticException { + ParseContext context = (ParseContext) ctx; + if (!nd.getName().equals("GBY")) { + return null; + } + context.setFollowedByGroupBy(true); + GroupByOperator groupByOp = (GroupByOperator) nd; + float groupByMemoryUsage = context.getConf().getFloatVar( + HiveConf.ConfVars.HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY); + groupByOp.getConf().setGroupByMemoryUsage(groupByMemoryUsage); + return null; + } + } + + /** + * SparkMapJoinProcessor. + * + */ + public static class SparkMapJoinProcessor implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) + throws SemanticException { + ParseContext context = (ParseContext) ctx; + if (!nd.getName().equals("MAPJOIN")) { + return null; + } + MapJoinOperator mapJoinOp = (MapJoinOperator) nd; + try { + hasGroupBy(mapJoinOp, context); + } catch (Exception e) { + e.printStackTrace(); + } + + MapJoinDesc mapJoinDesc = mapJoinOp.getConf(); + + // mapjoin should not affected by join reordering + mapJoinDesc.resetOrder(); + + HiveConf conf = context.getConf(); + // set hashtable memory usage + float hashtableMemoryUsage; + if (context.isFollowedByGroupBy()) { + hashtableMemoryUsage = conf.getFloatVar( + HiveConf.ConfVars.HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE); + } else { + hashtableMemoryUsage = conf.getFloatVar( + HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE); + } + mapJoinDesc.setHashTableMemoryUsage(hashtableMemoryUsage); + LOG.info("Setting max memory usage to " + hashtableMemoryUsage + " for table sink " + + (context.isFollowedByGroupBy() ? "" : "not") + " followed by group by"); + + HashTableSinkDesc hashTableSinkDesc = new HashTableSinkDesc(mapJoinDesc); + HashTableSinkOperator hashTableSinkOp = (HashTableSinkOperator) OperatorFactory + .get(hashTableSinkDesc); + + // get the last operator for processing big tables + int bigTable = mapJoinDesc.getPosBigTable(); + + // todo: support tez/vectorization + boolean useNontaged = conf.getBoolVar( + HiveConf.ConfVars.HIVECONVERTJOINUSENONSTAGED) && + conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("mr") && + !conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); + + // the parent ops for hashTableSinkOp + List> smallTablesParentOp = + new ArrayList>(); + List> dummyOperators = + new ArrayList>(); + List> directOperators = + new ArrayList>(); + // get all parents + List> parentsOp = mapJoinOp.getParentOperators(); + for (byte i = 0; i < parentsOp.size(); i++) { + if (i == bigTable) { + smallTablesParentOp.add(null); + directOperators.add(null); + continue; + } + Operator parent = parentsOp.get(i); + boolean directFetchable = useNontaged && + (parent instanceof TableScanOperator || parent instanceof MapJoinOperator); + if (directFetchable) { + // no filter, no projection. no need to stage + smallTablesParentOp.add(null); + directOperators.add(parent); + hashTableSinkDesc.getKeys().put(i, null); + hashTableSinkDesc.getExprs().put(i, null); + hashTableSinkDesc.getFilters().put(i, null); + } else { + // keep the parent id correct + smallTablesParentOp.add(parent); + directOperators.add(null); + int[] valueIndex = mapJoinDesc.getValueIndex(i); + if (valueIndex != null) { + // remove values in key exprs + // schema for value is already fixed in MapJoinProcessor#convertJoinOpMapJoinOp + List newValues = new ArrayList(); + List values = hashTableSinkDesc.getExprs().get(i); + for (int index = 0; index < values.size(); index++) { + if (valueIndex[index] < 0) { + newValues.add(values.get(index)); + } + } + hashTableSinkDesc.getExprs().put(i, newValues); + } + } + // let hashtable Op be the child of this parent + parent.replaceChild(mapJoinOp, hashTableSinkOp); + if (directFetchable) { + parent.setChildOperators(null); + } + + // create new operator: HashTable DummyOperator, which share the table desc + HashTableDummyDesc desc = new HashTableDummyDesc(); + HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc); + TableDesc tbl; + + if (parent.getSchema() == null) { + if (parent instanceof TableScanOperator) { + tbl = ((TableScanOperator) parent).getTableDesc(); + } else { + throw new SemanticException("Expected parent operator of type TableScanOperator." + + "Found " + parent.getClass().getName() + " instead."); + } + } else { + // get parent schema + RowSchema rowSchema = parent.getSchema(); + tbl = PlanUtils.getIntermediateFileTableDesc(PlanUtils.getFieldSchemasFromRowSchema( + rowSchema, "")); + } + dummyOp.getConf().setTbl(tbl); + // let the dummy op be the parent of mapjoin op + mapJoinOp.replaceParent(parent, dummyOp); + List> dummyChildren = + new ArrayList>(); + dummyChildren.add(mapJoinOp); + dummyOp.setChildOperators(dummyChildren); + // add this dummy op to the dummp operator list + dummyOperators.add(dummyOp); + } + hashTableSinkOp.setParentOperators(smallTablesParentOp); + + //TODO Suhas: investigate what this is + /* + if (hasAnyDirectFetch(directOperators)) { + context.addDirectWorks(mapJoinOp, directOperators); + }*/ + return null; + } + + private boolean hasAnyDirectFetch(List> directOperators) { + for (Operator operator : directOperators) { + if (operator != null) { + return true; + } + } + return false; + } + + public void hasGroupBy(Operator mapJoinOp, + ParseContext pc) throws Exception { + List> childOps = mapJoinOp.getChildOperators(); + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", GroupByOperator.getOperatorName() + "%"), + SparkMapJoinProcFactory.getGroupByProc()); + // The dispatcher fires the processor corresponding to the closest + // matching rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(SparkMapJoinProcFactory.getDefaultProc(), + opRules, pc); + GraphWalker ogw = new DefaultGraphWalker(disp); + // iterator the reducer operator tree + ArrayList topNodes = new ArrayList(); + topNodes.addAll(childOps); + ogw.startWalking(topNodes, null); + } + } + + private SparkMapJoinProcFactory() { + // prevent instantiation + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java new file mode 100644 index 0000000..da4d7ae --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; + +import com.google.common.base.Preconditions; + +/** + * This class is similar to MapJoinResolver. The difference though, is that + * we split a SparkWork into two SparkWorks, one containing all the BasWorks for the + * small tables, and the other containing the BaseWork for the big table. + * + * We also set up dependency for the two new SparkWorks. + */ +public class SparkMapJoinResolver implements PhysicalPlanResolver { + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + + Dispatcher dispatcher = new SparkMapJoinTaskDispatcher(pctx); + TaskGraphWalker graphWalker = new TaskGraphWalker(dispatcher); + + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getRootTasks()); + graphWalker.startWalking(topNodes, null); + return pctx; + } + + class SparkMapJoinTaskDispatcher implements Dispatcher { + + private PhysicalContext physicalContext; + + public SparkMapJoinTaskDispatcher(PhysicalContext pc) { + physicalContext = pc; + } + + @Override + public Object dispatch(Node nd, Stack stack, Object... nos) + throws SemanticException { + Task currentTask = (Task) nd; + if (currentTask instanceof SparkTask + && currentTask.getTaskTag() == Task.CONVERTED_MAPJOIN) { + SparkWork bigTblWork = (SparkWork) currentTask.getWork(); + Set smallTblMapWorks = bigTblWork.getRoots(); + + if (smallTblMapWorks.size() == 0) { + // This shouldn't happen + return null; + } + + // Assumption: + // All root works in the bigTblWork are MapWorks, and all of the MapWorks + // share the same children, which is the MapWork for the big table. + BaseWork childWork = null; + for (BaseWork bw : smallTblMapWorks) { + // TODO: LOTS of assertions - but hopefully some of them can be safely removed. + Preconditions.checkArgument(bw instanceof MapWork, + "AssertionError: BaseWork " + bw.getName() + " should be a MapWork," + + "but found to be a " + bw.getClass().getName()); + Preconditions.checkArgument(bigTblWork.getChildren(bw).size() == 1, + "AssertionError: BaseWork " + bw.getName() + " for small table is" + + " expected to have one child (the MapWork for big table), but" + + bigTblWork.getChildren(bw).size() + " children was found"); + BaseWork currChildWork = bigTblWork.getChildren(bw).get(0); + Preconditions.checkArgument(currChildWork instanceof MapWork, + "AssertionError: the big table work is expected to be a MapWork," + + " but " + currChildWork.getClass().getName() + " was found"); + + physicalContext = replaceReduceSinkWithHashTableSink(bw, physicalContext); + if (childWork == null) { + childWork = bigTblWork.getChildren(bw).get(0); + } else { + Preconditions.checkArgument(childWork == currChildWork, + "AssertionError: all small table MapWorks should share the" + + " same child work, which is the MapWork for big table"); + } + bigTblWork.remove(bw); + } + + // Now we create two SparkTasks - one for small tables and one for + // the big table. + HiveConf conf = physicalContext.conf; + SparkWork smallTblWork = new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); + SparkTask smallTblTask = (SparkTask) TaskFactory.get(smallTblWork, conf); + smallTblWork.addAll(smallTblMapWorks); + smallTblTask.addDependentTask(currentTask); + + // Set up dependencies + List> parentTasks = currentTask.getParentTasks(); + currentTask.setParentTasks(null); + if (parentTasks != null) { + for (Task tsk : parentTasks) { + tsk.addDependentTask(smallTblTask); + tsk.removeDependentTask(currentTask); + } + } else { + // TODO: need to handle ConditionalTask here + physicalContext.addToRootTask(smallTblTask); + physicalContext.removeFromRootTask(currentTask); + } + } + return null; + } + + /** + * for map-join, replace the reduce sink op with hashTableSink Operator in the operator tree + * @param smallTableMapWork + * @param phyCtx + * @return + * @throws SemanticException + */ + private PhysicalContext replaceReduceSinkWithHashTableSink(BaseWork smallTableMapWork, PhysicalContext phyCtx) + throws SemanticException { + /* + LocalMapJoinProcCtx localMapJoinProcCtx = new LocalMapJoinProcCtx(task, physicalContext + .getParseContext());*/ + ParseContext pc = phyCtx.getParseContext(); + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", MapJoinOperator.getOperatorName() + "%"), + SparkMapJoinProcFactory.getJoinProc()); + // The dispatcher fires the processor corresponding to the closest + // matching rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(LocalMapJoinProcFactory.getDefaultProc(), + opRules, pc); + GraphWalker ogw = new DefaultGraphWalker(disp); + // iterator the reducer operator tree + ArrayList topNodes = new ArrayList(); + if (smallTableMapWork instanceof MapWork) { + topNodes.addAll(((MapWork)smallTableMapWork).getAliasToWork().values()); + } + + ogw.startWalking(topNodes, null); + phyCtx.setParseContext(pc); + return phyCtx; + } + + } +} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 8215c26..b7ca11d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; @@ -54,7 +54,6 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; - /** * Parse Context: The current parse context. This is passed to the optimizer * which then transforms the operator tree using the parse context. All the @@ -65,7 +64,7 @@ * **/ -public class ParseContext { +public class ParseContext implements NodeProcessorCtx { private QB qb; private ASTNode ast; private HashMap opToPartPruner; @@ -113,6 +112,7 @@ private TableDesc fetchTabledesc; private Operator fetchSource; private ListSinkOperator fetchSink; + private boolean isFollowedByGroupBy; public ParseContext() { } @@ -673,4 +673,13 @@ public ListSinkOperator getFetchSink() { public void setFetchSink(ListSinkOperator fetchSink) { this.fetchSink = fetchSink; } + + public boolean isFollowedByGroupBy() { + return isFollowedByGroupBy; + } + + public void setFollowedByGroupBy(boolean isFollowedByGroupBy) { + this.isFollowedByGroupBy = isFollowedByGroupBy; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 795a5d7..1ad5648 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; @@ -44,6 +46,7 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.lib.TypeRule; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory; import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; @@ -51,6 +54,8 @@ import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer; import org.apache.hadoop.hive.ql.optimizer.spark.SetSparkReducerParallelism; import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkMapJoinOptimizer; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc; import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -112,9 +117,8 @@ protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, ReduceSinkOperator.getOperatorName() + "%"), new SetSparkReducerParallelism()); - // TODO: need to research and verify support convert join to map join optimization. - //opRules.put(new RuleRegExp(new String("Convert Join to Map-join"), - // JoinOperator.getOperatorName() + "%"), new SparkMapJoinOptimizer()); + opRules.put(new RuleRegExp(new String("Convert Join to Map-join"), + JoinOperator.getOperatorName() + "%"), new SparkMapJoinOptimizer()); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -146,8 +150,8 @@ protected void generateTaskTree(List> rootTasks, Pa opRules.put(new RuleRegExp("Split Work - ReduceSink", ReduceSinkOperator.getOperatorName() + "%"), genSparkWork); - //opRules.put(new RuleRegExp("No more walking on ReduceSink-MapJoin", - // MapJoinOperator.getOperatorName() + "%"), new SparkReduceSinkMapJoinProc()); + opRules.put(new RuleRegExp("No more walking on ReduceSink-MapJoin", + MapJoinOperator.getOperatorName() + "%"), new SparkReduceSinkMapJoinProc()); opRules.put(new RuleRegExp("Split Work + Move/Merge - FileSink", FileSinkOperator.getOperatorName() + "%"), @@ -172,6 +176,10 @@ public Object process(Node n, Stack s, } ); + opRules.put(new RuleRegExp(new String("R7"), + MapJoinOperator.getOperatorName() + "%"), + MapJoinFactory.getTableScanMapJoin()); + // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);