diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java index aa4ce17..977313a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java @@ -19,11 +19,11 @@ import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider; import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; -import org.apache.calcite.rel.metadata.RelMdDistribution; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdCollation; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistinctRowCount; +import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistribution; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdMemory; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdParallelism; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdRowCount; @@ -44,11 +44,9 @@ public HiveDefaultRelMetadataProvider(HiveConf hiveConf) { public RelMetadataProvider getMetadataProvider() { - // Init HiveRelMdParallelism with max split size - Double maxSplitSize = (double) HiveConf.getLongVar( + // Get max split size for HiveRelMdParallelism + final Double maxSplitSize = (double) HiveConf.getLongVar( this.hiveConf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE); - HiveRelMdParallelism hiveRelMdParallelism = - new HiveRelMdParallelism(maxSplitSize); // Return MD provider return ChainedRelMetadataProvider.of(ImmutableList @@ -58,8 +56,8 @@ public RelMetadataProvider getMetadataProvider() { HiveRelMdUniqueKeys.SOURCE, HiveRelMdSize.SOURCE, HiveRelMdMemory.SOURCE, - hiveRelMdParallelism.getMetadataProvider(), - RelMdDistribution.SOURCE, + new HiveRelMdParallelism(maxSplitSize).getMetadataProvider(), + HiveRelMdDistribution.SOURCE, HiveRelMdCollation.SOURCE, new DefaultRelMetadataProvider())); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java index 9e9cab1..8f981f2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java @@ -28,6 +28,11 @@ import org.apache.calcite.plan.RelOptAbstractTable; import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.plan.RelOptUtil.InputFinder; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.type.RelDataType; @@ -38,6 +43,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -45,6 +52,7 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ExprNodeConverter; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.plan.ColStatistics; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -148,6 +156,47 @@ public RelNode toRel(ToRelContext context) { } @Override + public List getCollationList() { + ImmutableList.Builder collationList = new ImmutableList.Builder(); + for (Order sortColumn : this.hiveTblMetadata.getSortCols()) { + for (int i=0; i() + .add(RelCollationTraitDef.INSTANCE.canonize( + new HiveRelCollation(collationList.build()))) + .build(); + } + + @Override + public RelDistribution getDistribution() { + ImmutableList.Builder columnPositions = new ImmutableList.Builder(); + for (String bucketColumn : this.hiveTblMetadata.getBucketCols()) { + for (int i=0; i keysListBuilder = + new ImmutableList.Builder(); + ImmutableList.Builder leftKeysListBuilder = + new ImmutableList.Builder(); + ImmutableList.Builder rightKeysListBuilder = + new ImmutableList.Builder(); + JoinPredicateInfo joinPredInfo = + HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join); + for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) { + JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo. + getEquiJoinPredicateElements().get(i); + for (int leftPos : joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema()) { + keysListBuilder.add(leftPos); + leftKeysListBuilder.add(leftPos); + } + for (int rightPos : joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema()) { + keysListBuilder.add(rightPos); + rightKeysListBuilder.add(rightPos); + } + } + + RelDistribution distribution; + switch (join.getJoinAlgorithm()) { + case SMB_JOIN: + case BUCKET_JOIN: + case COMMON_JOIN: + distribution = new HiveRelDistribution( + RelDistribution.Type.HASH_DISTRIBUTED, keysListBuilder.build()); + break; + case MAP_JOIN: + // Keep buckets from the streaming relation + if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { + distribution = new HiveRelDistribution( + RelDistribution.Type.HASH_DISTRIBUTED, leftKeysListBuilder.build()); + } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { + distribution = new HiveRelDistribution( + RelDistribution.Type.HASH_DISTRIBUTED, rightKeysListBuilder.build()); + } else { + distribution = null; + } + break; + default: + distribution = null; + } + return distribution; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java index c4663e3..95c2be5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java @@ -23,9 +23,12 @@ import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.BuiltInMethod; +import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.JoinAlgorithm; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; public class HiveRelMdParallelism extends RelMdParallelism { @@ -60,6 +63,31 @@ public Boolean isPhaseTransition(HiveSort sort) { return true; } + public Integer splitCount(HiveJoin join) { + if (join.getJoinAlgorithm() == JoinAlgorithm.COMMON_JOIN) { + return splitCountRepartition(join); + } + else if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN || + join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN || + join.getJoinAlgorithm() == JoinAlgorithm.SMB_JOIN) { + RelNode largeInput; + if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { + largeInput = join.getLeft(); + } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { + largeInput = join.getRight(); + } else { + return null; + } + return splitCount(largeInput); + } + return null; + } + + public Integer splitCount(HiveTableScan scan) { + RelOptHiveTable table = (RelOptHiveTable) scan.getTable(); + return table.getHiveTableMD().getNumBuckets(); + } + public Integer splitCount(RelNode rel) { Boolean newPhase = RelMetadataQuery.isPhaseTransition(rel); @@ -69,14 +97,7 @@ public Integer splitCount(RelNode rel) { if (newPhase) { // We repartition: new number of splits - final Double averageRowSize = RelMetadataQuery.getAverageRowSize(rel); - final Double rowCount = RelMetadataQuery.getRowCount(rel); - if (averageRowSize == null || rowCount == null) { - return null; - } - final Double totalSize = averageRowSize * rowCount; - final Double splitCount = totalSize / maxSplitSize; - return splitCount.intValue(); + return splitCountRepartition(rel); } // We do not repartition: take number of splits from children @@ -87,6 +108,18 @@ public Integer splitCount(RelNode rel) { return splitCount; } + public Integer splitCountRepartition(RelNode rel) { + // We repartition: new number of splits + final Double averageRowSize = RelMetadataQuery.getAverageRowSize(rel); + final Double rowCount = RelMetadataQuery.getRowCount(rel); + if (averageRowSize == null || rowCount == null) { + return null; + } + final Double totalSize = averageRowSize * rowCount; + final Double splitCount = totalSize / maxSplitSize; + return splitCount.intValue(); + } + } // End RelMdParallelism.java \ No newline at end of file