diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 7706b62..0d02b6e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -93,7 +93,7 @@ return retval; } else { int pos = 0; // it doesn't matter which position we use in this case. - convertJoinSMBJoin(joinOp, context, pos, 0, false); + convertJoinSMBJoin(joinOp, context.conf, pos, 0, false); return null; } } @@ -135,7 +135,7 @@ } else { // only case is full outer join with SMB enabled which is not possible. Convert to regular // join. - convertJoinSMBJoin(joinOp, context, 0, 0, false); + convertJoinSMBJoin(joinOp, context.conf, 0, 0, false); return null; } } @@ -155,11 +155,11 @@ // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. int pos = 0; // it doesn't matter which position we use in this case. - convertJoinSMBJoin(joinOp, context, pos, 0, false); + convertJoinSMBJoin(joinOp, context.conf, pos, 0, false); return null; } - MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); + MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context.conf, mapJoinConversionPos); // map join operator by default has no bucket cols and num of reduce sinks // reduced by 1 mapJoinOp @@ -180,7 +180,7 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat // map join either based on the size. Check if we can convert to SMB join. if ((context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) || (joinOp.getOpTraits().getNumReduceSinks() >= 2)) { - convertJoinSMBJoin(joinOp, context, 0, 0, false); + convertJoinSMBJoin(joinOp, context.conf, 0, 0, false); return null; } Class bigTableMatcherClass = null; @@ -210,30 +210,30 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. int pos = 0; // it doesn't matter which position we use in this case. - convertJoinSMBJoin(joinOp, context, pos, 0, false); + convertJoinSMBJoin(joinOp, context.conf, pos, 0, false); return null; } if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) { - convertJoinSMBJoin(joinOp, context, mapJoinConversionPos, + convertJoinSMBJoin(joinOp, context.conf, mapJoinConversionPos, tezBucketJoinProcCtx.getNumBuckets(), true); } else { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. int pos = 0; // it doesn't matter which position we use in this case. - convertJoinSMBJoin(joinOp, context, pos, 0, false); + convertJoinSMBJoin(joinOp, context.conf, pos, 0, false); } return null; } // replaces the join operator with a new CommonJoinOperator, removes the // parent reduce sinks - private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context, + public static void convertJoinSMBJoin(JoinOperator joinOp, HiveConf conf, int mapJoinConversionPos, int numBuckets, boolean adjustParentsChildren) throws SemanticException { MapJoinDesc mapJoinDesc = null; if (adjustParentsChildren) { - mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, + mapJoinDesc = MapJoinProcessor.getMapJoinDesc(conf, joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(), mapJoinConversionPos, true); } else { @@ -348,7 +348,7 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon return false; } - MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition); + MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context.conf, bigTablePosition); MapJoinDesc joinDesc = mapJoinOp.getConf(); joinDesc.setBucketMapJoin(true); @@ -622,7 +622,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c * for tez. */ - public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, + public static MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, HiveConf conf, int bigTablePosition) throws SemanticException { // bail on mux operator because currently the mux operator masks the emit keys // of the constituent reduce sinks. @@ -634,7 +634,7 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo // can safely convert the join to a map join. MapJoinOperator mapJoinOp = - MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, joinOp, + MapJoinProcessor.convertJoinOpMapJoinOp(conf, joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(), bigTablePosition, true); @@ -671,7 +671,7 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo return mapJoinOp; } - private boolean hasDynamicPartitionBroadcast(Operator parent) { + private static boolean hasDynamicPartitionBroadcast(Operator parent) { boolean hasDynamicPartitionPruning = false; for (Operator op : parent.getChildOperators()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index 0ddbbb1..1eb17c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -35,6 +35,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.SemiJoin; import org.apache.calcite.rel.logical.LogicalExchange; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; @@ -43,11 +44,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.LimitOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -57,6 +59,8 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; @@ -77,6 +81,7 @@ import org.apache.hadoop.hive.ql.plan.JoinCondDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.LimitDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; @@ -104,6 +109,7 @@ private final HIVEAGGOPMODE aggMode; private final boolean strictMode; private int reduceSinkTagGenerator; + private final HiveConf conf; public static HIVEAGGOPMODE getAggOPMode(HiveConf hc) { HIVEAGGOPMODE aggOpMode = HIVEAGGOPMODE.NO_SKEW_NO_MAP_SIDE_AGG; @@ -121,11 +127,11 @@ public static HIVEAGGOPMODE getAggOPMode(HiveConf hc) { return aggOpMode; } - public HiveOpConverter(Map> topOps, - HIVEAGGOPMODE aggMode, boolean strictMode) { + public HiveOpConverter(Map> topOps, HiveConf conf) { this.topOps = topOps; - this.aggMode = aggMode; - this.strictMode = strictMode; + this.conf = conf; + this.aggMode = getAggOPMode(conf); + this.strictMode = conf.getVar(HiveConf.ConfVars.HIVEMAPREDMODE).equalsIgnoreCase("strict"); this.reduceSinkTagGenerator = 0; } @@ -178,7 +184,7 @@ OpAttr dispatch(RelNode rn) throws SemanticException { /** * TODO: 1. PPD needs to get pushed in to TS - * + * * @param scanRel * @return */ @@ -289,7 +295,7 @@ OpAttr visit(HiveJoin joinRel) throws SemanticException { ExprNodeDesc[][] joinKeys = extractJoinKeys(joinPredInfo, joinRel.getInputs()); // 4. Generate Join operator - JoinOperator joinOp = genJoin(joinRel, joinPredInfo, children, joinKeys); + JoinOperator joinOp = genJoin(joinRel, joinPredInfo, children, joinKeys, conf); // 5. TODO: Extract condition for non-equi join elements (if any) and add it @@ -307,10 +313,10 @@ OpAttr visit(HiveJoin joinRel) throws SemanticException { // 8. Return result return new OpAttr(null, vcolMap, joinOp); } - + OpAttr visit(HiveSort sortRel) throws SemanticException { OpAttr inputOpAf = dispatch(sortRel.getInput()); - + if (LOG.isDebugEnabled()) { LOG.debug("Translating operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName() + " with row type: [" + sortRel.getRowType() + "]"); @@ -327,18 +333,18 @@ else if (sortRel.fetch == null) { " consists of sort+limit"); } } - + Operator inputOp = inputOpAf.inputs.get(0); Operator resultOp = inputOpAf.inputs.get(0); // 1. If we need to sort tuples based on the value of some // of their columns if (sortRel.getCollation() != RelCollations.EMPTY) { - + // In strict mode, in the presence of order by, limit must be specified if (strictMode && sortRel.fetch == null) { throw new SemanticException(ErrorMsg.NO_LIMIT_WITH_ORDERBY.getMsg()); } - + // 1.a. Extract order for each column from collation // Generate sortCols and order List sortCols = new ArrayList(); @@ -362,12 +368,12 @@ else if (sortRel.fetch == null) { } // Use only 1 reducer for order by int numReducers = 1; - - // 1.b. Generate reduce sink + + // 1.b. Generate reduce sink resultOp = genReduceSink(resultOp, sortCols.toArray(new ExprNodeDesc[sortCols.size()]), -1, new ArrayList(), order.toString(), numReducers, Operation.NOT_ACID, strictMode); - + // 1.c. Generate project operator Map descriptors = buildBacktrackFromReduceSink( (ReduceSinkOperator) resultOp, inputOp); @@ -382,21 +388,21 @@ else if (sortRel.fetch == null) { LOG.debug("Generated " + resultOp + " with row schema: [" + resultOp.getSchema() + "]"); } } - + // 2. If we need to generate limit if (sortRel.fetch != null) { int limit = RexLiteral.intValue(sortRel.fetch); LimitDesc limitDesc = new LimitDesc(limit); // TODO: Set 'last limit' global property ArrayList cinfoLst = createColInfos(inputOp); - resultOp = (LimitOperator) OperatorFactory.getAndMakeChild( + resultOp = OperatorFactory.getAndMakeChild( limitDesc, new RowSchema(cinfoLst), resultOp); - + if (LOG.isDebugEnabled()) { LOG.debug("Generated " + resultOp + " with row schema: [" + resultOp.getSchema() + "]"); } } - + // 3. Return result return inputOpAf.clone(resultOp); } @@ -504,7 +510,7 @@ private static ReduceSinkOperator genReduceSink(Operator input, ExprNodeDesc[ return genReduceSink(input, keys, tag, new ArrayList(), "", numReducers, acidOperation, strictMode); } - + @SuppressWarnings({ "rawtypes", "unchecked" }) private static ReduceSinkOperator genReduceSink(Operator input, ExprNodeDesc[] keys, int tag, ArrayList partitionCols, @@ -614,11 +620,11 @@ private static ReduceSinkOperator genReduceSink(Operator input, } private static JoinOperator genJoin(HiveJoin hiveJoin, JoinPredicateInfo joinPredInfo, - List> children, ExprNodeDesc[][] joinKeys) throws SemanticException { + List> children, ExprNodeDesc[][] joinKeys, HiveConf conf) throws SemanticException { // Extract join type JoinType joinType = extractJoinType(hiveJoin); - + // NOTE: Currently binary joins only JoinCondDesc[] joinCondns = new JoinCondDesc[1]; joinCondns[0] = new JoinCondDesc(new JoinCond(0, 1, joinType)); @@ -687,6 +693,39 @@ private static JoinOperator genJoin(HiveJoin hiveJoin, JoinPredicateInfo joinPre joinOp.setColumnExprMap(colExprMap); joinOp.setPosToAliasMap(posToAliasMap); + if ("tez".equals(conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) { + int bigTbl = getBigTablePos(hiveJoin); + switch (hiveJoin.getJoinAlgorithm()) { + case COMMON_JOIN: + // nothing to do. + break; + + case MAP_JOIN: + // this magic method converts join to map-join as well as fixes wiring of tree. + ConvertJoinMapJoin.convertJoinMapJoin(joinOp, conf, bigTbl); + break; + + case BUCKET_JOIN: + MapJoinOperator mapJoinOp = ConvertJoinMapJoin.convertJoinMapJoin(joinOp, conf, bigTbl); + MapJoinDesc joinDesc = mapJoinOp.getConf(); + joinDesc.setBucketMapJoin(true); + Map bigTableBucketNumMapping = new HashMap(); + bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), RelMetadataQuery.splitCount(hiveJoin.getInput(bigTbl))); + joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping); + break; + + case SMB_JOIN: + ConvertJoinMapJoin.convertJoinSMBJoin(joinOp, conf, bigTbl, RelMetadataQuery.splitCount(hiveJoin.getInput(bigTbl)), true); + break; + case NONE: + break; + + default: + break; + } + } + + // TODO: null safes? if (LOG.isDebugEnabled()) { @@ -696,6 +735,18 @@ private static JoinOperator genJoin(HiveJoin hiveJoin, JoinPredicateInfo joinPre return joinOp; } + private static int getBigTablePos(HiveJoin join) throws CalciteSemanticException{ + switch(join.getMapJoinStreamingSide()) { + case LEFT_RELATION: + return 0; + case RIGHT_RELATION: + return 1; + case NONE: + default: + throw new CalciteSemanticException("Streaming side for join is not marked."); + } + } + private static JoinType extractJoinType(HiveJoin join) { // UNIQUE if (join.isDistinct()) { @@ -761,7 +812,7 @@ private static JoinType extractJoinType(HiveJoin join) { private static ExprNodeDesc convertToExprNode(RexNode rn, RelNode inputRel, String tabAlias) { - return (ExprNodeDesc) rn.accept(new ExprNodeConverter(tabAlias, + return rn.accept(new ExprNodeConverter(tabAlias, inputRel.getRowType(), false)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index bf2a8dd..079930c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -609,9 +609,7 @@ Operator getOptimizedHiveOPDag() throws SemanticException { } RelNode modifiedOptimizedOptiqPlan = introduceProjectIfNeeded(optimizedOptiqPlan); - - Operator hiveRoot = new HiveOpConverter(topOps, HiveOpConverter.getAggOPMode(conf), - conf.getVar(HiveConf.ConfVars.HIVEMAPREDMODE).equalsIgnoreCase("strict")).convert(modifiedOptimizedOptiqPlan); + Operator hiveRoot = new HiveOpConverter(topOps, conf).convert(modifiedOptimizedOptiqPlan); RowResolver hiveRootRR = genRowResolver(hiveRoot, getQB()); opParseCtx.put(hiveRoot, new OpParseContext(hiveRootRR)); return genFileSinkPlan(getQB().getParseInfo().getClauseNames().iterator().next(), getQB(), hiveRoot);