diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchange4JoinRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchange4JoinRule.java index 437c7e0..cdffb45 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchange4JoinRule.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchange4JoinRule.java @@ -26,11 +26,12 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.logical.LogicalExchange; -import org.apache.calcite.sql.JoinType; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelDistribution; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelDistribution; /** Not an optimization rule. * Rule to aid in translation from Calcite tree -> Hive tree. @@ -42,6 +43,9 @@ * Join */ public class HiveInsertExchange4JoinRule extends RelOptRule { + + protected static transient final Log LOG = LogFactory + .getLog(HiveInsertExchange4JoinRule.class); public HiveInsertExchange4JoinRule() { @@ -54,6 +58,11 @@ public HiveInsertExchange4JoinRule() { @Override public void onMatch(RelOptRuleCall call) { Join join = call.rel(0); + + if (call.rel(1) instanceof LogicalExchange && + call.rel(2) instanceof LogicalExchange) { + return; + } JoinPredicateInfo joinPredInfo = HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join); @@ -74,9 +83,9 @@ public void onMatch(RelOptRuleCall call) { Join newJoin = join.copy(join.getTraitSet(), join.getCondition(), left, right, join.getJoinType(), join.isSemiJoinDone()); -call.getPlanner().onCopy(join, newJoin); - -call.transformTo(newJoin); + call.getPlanner().onCopy(join, newJoin); + + call.transformTo(newJoin); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index c7ae279..0ddbbb1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -29,9 +29,12 @@ import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelDistribution.Type; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.SemiJoin; +import org.apache.calcite.rel.logical.LogicalExchange; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; @@ -100,6 +103,7 @@ private final Map> topOps; private final HIVEAGGOPMODE aggMode; private final boolean strictMode; + private int reduceSinkTagGenerator; public static HIVEAGGOPMODE getAggOPMode(HiveConf hc) { HIVEAGGOPMODE aggOpMode = HIVEAGGOPMODE.NO_SKEW_NO_MAP_SIDE_AGG; @@ -122,6 +126,7 @@ public HiveOpConverter(Map> topOps, this.topOps = topOps; this.aggMode = aggMode; this.strictMode = strictMode; + this.reduceSinkTagGenerator = 0; } private class OpAttr { @@ -163,6 +168,8 @@ OpAttr dispatch(RelNode rn) throws SemanticException { return visit((HiveSort) rn); } else if (rn instanceof HiveUnion) { return visit((HiveUnion) rn); + } else if (rn instanceof LogicalExchange) { + return visit((LogicalExchange) rn); } LOG.error(rn.getClass().getCanonicalName() + "operator translation not supported" + " yet in return path."); @@ -263,8 +270,11 @@ OpAttr visit(HiveProject projectRel) throws SemanticException { OpAttr visit(HiveJoin joinRel) throws SemanticException { // 1. Convert inputs OpAttr[] inputs = new OpAttr[joinRel.getInputs().size()]; + List> children = new ArrayList>( + joinRel.getInputs().size()); for (int i=0; i> children = new ArrayList>(); - for (int i = 0; i < inputs.length; i++) { - // Generate a ReduceSink operator for each join child - ReduceSinkOperator child = genReduceSink(inputs[i].inputs.get(0), joinKeys[i], - i, -1, Operation.NOT_ACID, strictMode); - children.add(child); - } - - // 5. Generate Join operator + // 4. Generate Join operator JoinOperator joinOp = genJoin(joinRel, joinPredInfo, children, joinKeys); - // 6. TODO: Extract condition for non-equi join elements (if any) and add it + // 5. TODO: Extract condition for non-equi join elements (if any) and add it - // 7. Virtual columns + // 6. Virtual columns Map vcolMap = new HashMap(); vcolMap.putAll(inputs[0].vcolMap); if (extractJoinType(joinRel) != JoinType.LEFTSEMI) { @@ -456,6 +457,33 @@ OpAttr visit(HiveUnion unionRel) throws SemanticException { return inputs[0].clone(unionOp); } + OpAttr visit(LogicalExchange exchangeRel) throws SemanticException { + OpAttr inputOpAf = dispatch(exchangeRel.getInput()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Translating operator rel#" + exchangeRel.getId() + ":" + exchangeRel.getRelTypeName() + + " with row type: [" + exchangeRel.getRowType() + "]"); + } + + RelDistribution distribution = exchangeRel.getDistribution(); + if (distribution.getType() != Type.HASH_DISTRIBUTED) { + throw new SemanticException("Only hash distribution supported for LogicalExchange"); + } + ExprNodeDesc[] expressions = new ExprNodeDesc[distribution.getKeys().size()]; + for (int i=0; i inputs) { ExprNodeDesc[][] joinKeys = new ExprNodeDesc[inputs.size()][];