diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index ce43120..7706b62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -93,7 +93,7 @@ return retval; } else { int pos = 0; // it doesn't matter which position we use in this case. - convertJoinSMBJoin(joinOp, context, pos, 0, false, false); + convertJoinSMBJoin(joinOp, context, pos, 0, false); return null; } } @@ -135,7 +135,7 @@ } else { // only case is full outer join with SMB enabled which is not possible. Convert to regular // join. - convertJoinSMBJoin(joinOp, context, 0, 0, false, false); + convertJoinSMBJoin(joinOp, context, 0, 0, false); return null; } } @@ -155,7 +155,7 @@ // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. int pos = 0; // it doesn't matter which position we use in this case. - convertJoinSMBJoin(joinOp, context, pos, 0, false, false); + convertJoinSMBJoin(joinOp, context, pos, 0, false); return null; } @@ -180,7 +180,7 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat // map join either based on the size. Check if we can convert to SMB join. if ((context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) || (joinOp.getOpTraits().getNumReduceSinks() >= 2)) { - convertJoinSMBJoin(joinOp, context, 0, 0, false, false); + convertJoinSMBJoin(joinOp, context, 0, 0, false); return null; } Class bigTableMatcherClass = null; @@ -188,7 +188,7 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat String selector = HiveConf.getVar(context.parseContext.getConf(), HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR); bigTableMatcherClass = - (Class) JavaUtils.loadClass(selector); + JavaUtils.loadClass(selector); } catch (ClassNotFoundException e) { throw new SemanticException(e.getMessage()); } @@ -210,18 +210,18 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. int pos = 0; // it doesn't matter which position we use in this case. - convertJoinSMBJoin(joinOp, context, pos, 0, false, false); + convertJoinSMBJoin(joinOp, context, pos, 0, false); return null; } if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { convertJoinSMBJoin(joinOp, context, mapJoinConversionPos, - tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true); + tezBucketJoinProcCtx.getNumBuckets(), true); } else { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. int pos = 0; // it doesn't matter which position we use in this case. - convertJoinSMBJoin(joinOp, context, pos, 0, false, false); + convertJoinSMBJoin(joinOp, context, pos, 0, false); } return null; } @@ -229,7 +229,7 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat // replaces the join operator with a new CommonJoinOperator, removes the // parent reduce sinks private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context, - int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren) + int mapJoinConversionPos, int numBuckets, boolean adjustParentsChildren) throws SemanticException { MapJoinDesc mapJoinDesc = null; if (adjustParentsChildren) { @@ -253,7 +253,7 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont CommonMergeJoinOperator mergeJoinOp = (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets, - isSubQuery, mapJoinConversionPos, mapJoinDesc), joinOp.getSchema()); + mapJoinConversionPos, mapJoinDesc), joinOp.getSchema()); int numReduceSinks = joinOp.getOpTraits().getNumReduceSinks(); OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp .getOpTraits().getSortCols(), numReduceSinks); @@ -363,8 +363,6 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon Map bigTableBucketNumMapping = new HashMap(); bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets()); joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping); - LOG.info("Setting legacy map join to " + (!tezBucketJoinProcCtx.isSubQuery())); - joinDesc.setCustomBucketMapJoin(!tezBucketJoinProcCtx.isSubQuery()); return true; } @@ -405,13 +403,10 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont } } - boolean isSubQuery = false; if (numBuckets < 0) { - isSubQuery = true; numBuckets = bigTableRS.getConf().getNumReducers(); } tezBucketJoinProcCtx.setNumBuckets(numBuckets); - tezBucketJoinProcCtx.setIsSubQuery(isSubQuery); LOG.info("We can convert the join to an SMB join."); return true; } @@ -457,13 +452,10 @@ private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, * this is the case when the big table is a sub-query and is probably already bucketed by the * join column in say a group by operation */ - boolean isSubQuery = false; if (numBuckets < 0) { - isSubQuery = true; numBuckets = rs.getConf().getNumReducers(); } tezBucketJoinProcCtx.setNumBuckets(numBuckets); - tezBucketJoinProcCtx.setIsSubQuery(isSubQuery); return true; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index b184cf4..5f0c0ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -31,8 +31,10 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -64,7 +66,7 @@ /* (non-Javadoc) * This processor addresses the RS-MJ case that occurs in tez on the small/hash - * table side of things. The work that RS will be a part of must be connected + * table side of things. The work that RS will be a part of must be connected * to the MJ work via be a broadcast edge. * We should not walk down the tree when we encounter this pattern because: * the type of work (map work or reduce work) needs to be determined @@ -91,7 +93,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, parentRS.setSkipTag(true); // remember the original parent list before we start modifying it. if (!context.mapJoinParentMap.containsKey(mapJoinOp)) { - List> parents = new ArrayList(mapJoinOp.getParentOperators()); + List> parents = new ArrayList>(mapJoinOp.getParentOperators()); context.mapJoinParentMap.put(mapJoinOp, parents); } @@ -173,9 +175,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, parentRS.getConf().setReducerTraits(EnumSet.of(FIXED)); numBuckets = (Integer) joinConf.getBigTableBucketNumMapping().values().toArray()[0]; - if (joinConf.getCustomBucketMapJoin()) { + Operator rootOp = OperatorUtils.findSingleOperatorUpstream(mapJoinOp.getParentOperators() + .get(joinConf.getPosBigTable()), TableScanOperator.class); + + if (rootOp instanceof TableScanOperator) { // we will run in mapper edgeType = EdgeType.CUSTOM_EDGE; - } else { + } else { // we will run in reducer edgeType = EdgeType.CUSTOM_SIMPLE_EDGE; } } @@ -218,8 +223,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, } linkWorkMap.put(parentWork, edgeProp); context.linkOpWithWorkMap.put(mapJoinOp, linkWorkMap); - - List reduceSinks + + List reduceSinks = context.linkWorkWithReduceSinkMap.get(parentWork); if (reduceSinks == null) { reduceSinks = new ArrayList(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java index 821f60c..3e0ea47 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java @@ -18,28 +18,15 @@ package org.apache.hadoop.hive.ql.optimizer; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.hive.conf.HiveConf; public class TezBucketJoinProcCtx extends BucketJoinProcCtx { - // determines if we need to use custom edge or one-to-one edge - boolean isSubQuery = false; int numBuckets = -1; public TezBucketJoinProcCtx(HiveConf conf) { super(conf); } - public void setIsSubQuery (boolean isSubQuery) { - this.isSubQuery = isSubQuery; - } - - public boolean isSubQuery () { - return isSubQuery; - } - public void setNumBuckets(int numBuckets) { this.numBuckets = numBuckets; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java index f3728f1..2354139 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java @@ -24,24 +24,18 @@ public class CommonMergeJoinDesc extends MapJoinDesc implements Serializable { private static final long serialVersionUID = 1L; private int numBuckets; - private boolean isSubQuery; private int mapJoinConversionPos; CommonMergeJoinDesc() { } - public CommonMergeJoinDesc(int numBuckets, boolean isSubQuery, int mapJoinConversionPos, + public CommonMergeJoinDesc(int numBuckets, int mapJoinConversionPos, MapJoinDesc joinDesc) { super(joinDesc); this.numBuckets = numBuckets; - this.isSubQuery = isSubQuery; this.mapJoinConversionPos = mapJoinConversionPos; } - public boolean getCustomMerge() { - return isSubQuery; - } - public int getNumBuckets() { return numBuckets; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 9fdd417..bae81e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -53,9 +53,6 @@ private Map parentToInput = new HashMap(); private Map parentKeyCounts = new HashMap(); - // for tez. used to remember which type of a Bucket Map Join this is. - private boolean customBucketMapJoin; - // table alias (small) --> input file name (big) --> target file names (small) private Map>> aliasBucketFileNameMapping; private Map bigTableBucketNumMapping; @@ -90,7 +87,6 @@ public MapJoinDesc(MapJoinDesc clone) { this.dumpFilePrefix = clone.dumpFilePrefix; this.parentToInput = clone.parentToInput; this.parentKeyCounts = clone.parentKeyCounts; - this.customBucketMapJoin = clone.customBucketMapJoin; } public MapJoinDesc(final Map> keys, @@ -327,14 +323,7 @@ public float getHashTableMemoryUsage() { return hashtableMemoryUsage; } - public void setCustomBucketMapJoin(boolean customBucketMapJoin) { - this.customBucketMapJoin = customBucketMapJoin; - } - - public boolean getCustomBucketMapJoin() { - return this.customBucketMapJoin; - } - + @Override public boolean isMapSideJoin() { return true; }