diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4f2ea9a..d28f9d7 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2471,12 +2471,19 @@ HIVE_OPTIMIZE_BI_REWRITE_COUNTDISTINCT_ENABLED("hive.optimize.bi.rewrite.countdistinct.enabled", true, "Enables to rewrite COUNT(DISTINCT(X)) queries to be rewritten to use sketch functions."), - HIVE_OPTIMIZE_BI_REWRITE_COUNT_DISTINCT_SKETCH( "hive.optimize.bi.rewrite.countdistinct.sketch", "hll", new StringSet("hll"), "Defines which sketch type to use when rewriting COUNT(DISTINCT(X)) expressions. " + "Distinct counting can be done with: hll"), + HIVE_OPTIMIZE_BI_REWRITE_PERCENTILECONT_ENABLED("hive.optimize.bi.rewrite.percentile_cont.enabled", + true, + "Enables to rewrite PERCENTILE_CONT(X) queries to be rewritten to use sketch functions."), + HIVE_OPTIMIZE_BI_REWRITE_PERCENTILECONT_SKETCH( + "hive.optimize.bi.rewrite.percentile_cont.sketch", "kll", + new StringSet("kll"), + "Defines which sketch type to use when rewriting PERCENTILE_CONT expressions. Options: kll"), + // Statistics HIVE_STATS_ESTIMATE_STATS("hive.stats.estimate", true, diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 39e78d6..2990cfd 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -829,7 +829,8 @@ schq_ingest.q,\ sketches_hll.q,\ sketches_theta.q,\ - sketches_rewrite.q,\ + sketches_rewrite_count_distinct.q,\ + sketches_rewrite_percentile_cont.q,\ sketches_materialized_view_rollup.q,\ sketches_materialized_view_rollup2.q,\ sketches_materialized_view_safety.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java index 8865380..cc48d5b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java @@ -65,7 +65,7 @@ private static final String GET_CDF = "cdf"; private static final String GET_PMF = "pmf"; private static final String GET_QUANTILES = "quantiles"; - private static final String GET_QUANTILE = "quantile"; + public static final String GET_QUANTILE = "quantile"; private static final String GET_RANK = "rank"; private static final String INTERSECT_SKETCH = "intersect"; private static final String INTERSECT_SKETCH1 = "intersect_f"; @@ -109,7 +109,8 @@ } SketchDescriptor sc = sketchClasses.get(className); if (!sc.fnMap.containsKey(function)) { - throw new IllegalArgumentException(String.format("The Sketch-class '%s' doesn't have a '%s' method", function)); + throw new IllegalArgumentException( + String.format("The Sketch-class '%s' doesn't have a '%s' method", className, function)); } return sketchClasses.get(className).fnMap.get(function); } @@ -128,6 +129,7 @@ SketchFunctionDescriptor sketchSFD = sd.fnMap.get(DATA_TO_SKETCH); SketchFunctionDescriptor unionSFD = sd.fnMap.get(UNION_SKETCH); SketchFunctionDescriptor estimateSFD = sd.fnMap.get(SKETCH_TO_ESTIMATE); + SketchFunctionDescriptor quantileSFD = sd.fnMap.get(GET_QUANTILE); if (sketchSFD == null || unionSFD == null) { continue; @@ -163,6 +165,20 @@ estimateSFD.setCalciteFunction(estimateFn); } + + if (quantileSFD != null && quantileSFD.getReturnRelDataType().isPresent()) { + SqlFunction quantileFn = new HiveSqlFunction(quantileSFD.name, + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(quantileSFD.getReturnRelDataType().get().getSqlTypeName()), + InferTypes.ANY_NULLABLE, + OperandTypes.family(), + SqlFunctionCategory.USER_DEFINED_FUNCTION, + true, + false); + + quantileSFD.setCalciteFunction(quantileFn); + + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteCountDistinctToDataSketches.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteCountDistinctToDataSketches.java deleted file mode 100644 index c23e2c4..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteCountDistinctToDataSketches.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.optimizer.calcite.rules; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.rel.RelCollation; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.RelFactories.ProjectFactory; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlAggFunction; -import org.apache.calcite.sql.SqlOperator; -import org.apache.hadoop.hive.ql.exec.DataSketchesFunctions; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; -import org.apache.hive.plugin.api.HiveUDFPlugin.UDFDescriptor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.ImmutableList; - -/** - * This rule could rewrite {@code count(distinct(x))} calls to be calculated using sketch based functions. - * - * The transformation here works on Aggregate nodes; the operations done are the following: - * - * 1. Identify candidate {@code count(distinct)} aggregate calls - * 2. A new Aggregate is created in which the aggregation is done by the sketch function - * 3. A new Project is inserted on top of the Aggregate; which unwraps the resulting - * count-distinct estimation from the sketch representation - */ -public final class HiveRewriteCountDistinctToDataSketches extends RelOptRule { - - protected static final Logger LOG = LoggerFactory.getLogger(HiveRewriteCountDistinctToDataSketches.class); - private final String sketchClass; - private final ProjectFactory projectFactory; - - public HiveRewriteCountDistinctToDataSketches(String sketchClass) { - super(operand(HiveAggregate.class, any())); - this.sketchClass = sketchClass; - projectFactory = HiveRelFactories.HIVE_PROJECT_FACTORY; - } - - @Override - public void onMatch(RelOptRuleCall call) { - final Aggregate aggregate = call.rel(0); - - if (aggregate.getGroupSets().size() != 1) { - // not yet supported - return; - } - - List newAggCalls = new ArrayList(); - - VBuilder vb = new VBuilder(aggregate); - - if (aggregate.getAggCallList().equals(vb.newAggCalls)) { - // rule didn't made any changes - return; - } - - newAggCalls = vb.newAggCalls; - RelNode newAgg = aggregate.copy(aggregate.getTraitSet(), aggregate.getInput(), aggregate.getGroupSet(), - aggregate.getGroupSets(), newAggCalls); - - RelNode newProject = projectFactory.createProject(newAgg, vb.newProjects, aggregate.getRowType().getFieldNames()); - - call.transformTo(newProject); - return; - } - - /** - * Helper class to help in building a new Aggregate and Project. - */ - // NOTE: methods in this class are not re-entrant; drop-to-frame to constructor during debugging - class VBuilder { - - private Aggregate aggregate; - private List newAggCalls; - private List newProjects; - private final RexBuilder rexBuilder; - - public VBuilder(Aggregate aggregate) { - this.aggregate = aggregate; - newAggCalls = new ArrayList(); - newProjects = new ArrayList(); - rexBuilder = aggregate.getCluster().getRexBuilder(); - - // add non-aggregated fields - as identity projections - addGroupFields(); - - for (AggregateCall aggCall : aggregate.getAggCallList()) { - processAggCall(aggCall); - } - } - - private void addGroupFields() { - for (int i = 0; i < aggregate.getGroupCount(); i++) { - newProjects.add(rexBuilder.makeInputRef(aggregate, 0)); - } - } - - private void processAggCall(AggregateCall aggCall) { - if (isSimpleCountDistinct(aggCall)) { - rewriteCountDistinct(aggCall); - return; - } - appendAggCall(aggCall, null); - } - - private void appendAggCall(AggregateCall aggCall, SqlOperator projectOperator) { - RelDataType origType = aggregate.getRowType().getFieldList().get(newProjects.size()).getType(); - RexNode projRex = rexBuilder.makeInputRef(aggCall.getType(), newProjects.size()); - if (projectOperator != null) { - projRex = rexBuilder.makeCall(projectOperator, ImmutableList.of(projRex)); - projRex = rexBuilder.makeCast(origType, projRex); - } - newAggCalls.add(aggCall); - newProjects.add(projRex); - } - - private boolean isSimpleCountDistinct(AggregateCall aggCall) { - return aggCall.isDistinct() && aggCall.getArgList().size() == 1 - && aggCall.getAggregation().getName().equalsIgnoreCase("count") && !aggCall.hasFilter(); - } - - private void rewriteCountDistinct(AggregateCall aggCall) { - SqlAggFunction aggFunction = (SqlAggFunction) getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH); - boolean distinct = false; - boolean approximate = true; - boolean ignoreNulls = aggCall.ignoreNulls(); - List argList = aggCall.getArgList(); - int filterArg = aggCall.filterArg; - RelCollation collation = aggCall.getCollation(); - int groupCount = aggregate.getGroupCount(); - RelNode input = aggregate.getInput(); - RelDataType type = rexBuilder.deriveReturnType(aggFunction, Collections.emptyList()); - String name = aggFunction.getName(); - - AggregateCall ret = AggregateCall.create(aggFunction, distinct, approximate, ignoreNulls, argList, filterArg, - collation, groupCount, input, type, name); - - appendAggCall(ret, getSqlOperator(DataSketchesFunctions.SKETCH_TO_ESTIMATE)); - } - - private SqlOperator getSqlOperator(String fnName) { - UDFDescriptor fn = DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName); - if (!fn.getCalciteFunction().isPresent()) { - throw new RuntimeException(fn.toString() + " doesn't have a Calcite function associated with it"); - } - return fn.getCalciteFunction().get(); - } - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRule.java new file mode 100644 index 0000000..9b5c1fb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRule.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.RelFactories.ProjectFactory; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableBitSet.Builder; +import org.apache.curator.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hive.ql.exec.DataSketchesFunctions; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hive.plugin.api.HiveUDFPlugin.UDFDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; + +/** + * This rule could rewrite aggregate calls to be calculated using sketch based functions. + * + *
+ * Currently it can rewrite: + *
    + *
  • {@code count(distinct(x))} to distinct counting sketches
  • + *
  • {@code percentile_cont(0.2) within group (order by id)}
  • + *
+ * + *

+ * The transformation here works on Aggregate nodes; the operations done are the following: + *

+ *
    + *
  1. Identify candidate aggregate calls
  2. + *
  3. A new Project is inserted below the Aggregate; to help with data pre-processing
  4. + *
  5. A new Aggregate is created in which the aggregation is done by the sketch function
  6. + *
  7. A new Project is inserted on top of the Aggregate; which unwraps the resulting + * count-distinct estimation from the sketch representation
  8. + *
+ */ +public final class HiveRewriteToDataSketchesRule extends RelOptRule { + + protected static final Logger LOG = LoggerFactory.getLogger(HiveRewriteToDataSketchesRule.class); + private final Optional countDistinctSketchType; + private final Optional percentileContSketchType; + private final ProjectFactory projectFactory; + + public HiveRewriteToDataSketchesRule(Optional countDistinctSketchType, + Optional percentileContSketchType) { + super(operand(HiveAggregate.class, any())); + this.countDistinctSketchType = countDistinctSketchType; + this.percentileContSketchType = percentileContSketchType; + projectFactory = HiveRelFactories.HIVE_PROJECT_FACTORY; + } + + @Override + public void onMatch(RelOptRuleCall call) { + final Aggregate aggregate = call.rel(0); + + if (aggregate.getGroupSets().size() != 1) { + // not yet supported + return; + } + + List newAggCalls = new ArrayList(); + + VBuilder vb = new VBuilder(aggregate); + + if (aggregate.getAggCallList().equals(vb.newAggCalls)) { + // rule didn't made any changes + return; + } + + newAggCalls = vb.newAggCalls; + List filedNames=new ArrayList(); + for (int i=0;i newAggCalls; + private List newProjects; + private List newProjectsBelow; + private List rewrites; + + public VBuilder(Aggregate aggregate) { + this.aggregate = aggregate; + newAggCalls = new ArrayList(); + newProjects = new ArrayList(); + newProjectsBelow = new ArrayList(); + rexBuilder = aggregate.getCluster().getRexBuilder(); + rewrites = new ArrayList(); + + // add identity projections + addProjectedFields(); + + if (countDistinctSketchType.isPresent()) { + rewrites.add(new CountDistinctRewrite(countDistinctSketchType.get())); + } + if (percentileContSketchType.isPresent()) { + rewrites.add(new PercentileContRewrite(percentileContSketchType.get())); + } + + for (AggregateCall aggCall : aggregate.getAggCallList()) { + processAggCall(aggCall); + } + } + + private void addProjectedFields() { + for (int i = 0; i < aggregate.getGroupCount(); i++) { + newProjects.add(rexBuilder.makeInputRef(aggregate.getInput(), i)); + } + Builder b = ImmutableBitSet.builder(); + b.addAll(aggregate.getGroupSet()); + for (AggregateCall aggCall: aggregate.getAggCallList()) { + b.addAll(aggCall.getArgList()); + } + ImmutableBitSet inputs = b.build(); + Integer maxIdx = Collections.max(inputs.asSet()); + for (int i = 0; i < maxIdx; i++) { + newProjectsBelow.add(rexBuilder.makeInputRef(aggregate.getInput(), i)); + } + } + + private void processAggCall(AggregateCall aggCall) { + for (RewriteProcedure rewrite : rewrites) { + if (rewrite.isApplicable(aggCall)) { + rewrite.rewrite(aggCall); + return; + } + } + appendAggCall(aggCall); + } + + private void appendAggCall(AggregateCall aggCall) { + RexNode projRex = rexBuilder.makeInputRef(aggCall.getType(), newProjects.size()); + + newAggCalls.add(aggCall); + newProjects.add(projRex); + } + + abstract class RewriteProcedure { + + private final String sketchClass; + + public RewriteProcedure(String sketchClass) { + this.sketchClass = sketchClass; + } + + abstract boolean isApplicable(AggregateCall aggCall); + abstract void rewrite(AggregateCall aggCall); + + protected SqlOperator getSqlOperator(String fnName) { + UDFDescriptor fn = DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName); + if (!fn.getCalciteFunction().isPresent()) { + throw new RuntimeException(fn.toString() + " doesn't have a Calcite function associated with it"); + } + return fn.getCalciteFunction().get(); + } + + } + + class CountDistinctRewrite extends RewriteProcedure { + + public CountDistinctRewrite(String sketchClass) { + super(sketchClass); + } + + @Override + boolean isApplicable(AggregateCall aggCall) { + return aggCall.isDistinct() && aggCall.getArgList().size() == 1 + && aggCall.getAggregation().getName().equalsIgnoreCase("count") && !aggCall.hasFilter(); + } + + @Override + void rewrite(AggregateCall aggCall) { + RelDataType origType = aggregate.getRowType().getFieldList().get(newProjects.size()).getType(); + + Integer argIndex = aggCall.getArgList().get(0); + RexNode call = rexBuilder.makeInputRef(aggregate.getInput(), argIndex); + newProjectsBelow.add(call); + + ArrayList newArgList = Lists.newArrayList(newProjectsBelow.size() - 1); + + SqlAggFunction aggFunction = (SqlAggFunction) getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH); + boolean distinct = false; + boolean approximate = true; + boolean ignoreNulls = true; + List argList = newArgList; + int filterArg = aggCall.filterArg; + RelCollation collation = aggCall.getCollation(); + RelDataType type = rexBuilder.deriveReturnType(aggFunction, Collections.emptyList()); + String name = aggFunction.getName(); + + AggregateCall newAgg = AggregateCall.create(aggFunction, distinct, approximate, ignoreNulls, argList, filterArg, + collation, type, name); + + SqlOperator projectOperator = getSqlOperator(DataSketchesFunctions.SKETCH_TO_ESTIMATE); + RexNode projRex = rexBuilder.makeInputRef(newAgg.getType(), newProjects.size()); + projRex = rexBuilder.makeCall(projectOperator, ImmutableList.of(projRex)); + projRex = rexBuilder.makeCall(SqlStdOperatorTable.ROUND, ImmutableList.of(projRex)); + projRex = rexBuilder.makeCast(origType, projRex); + + newAggCalls.add(newAgg); + newProjects.add(projRex); + } + + } + + class PercentileContRewrite extends RewriteProcedure { + + public PercentileContRewrite(String sketchClass) { + super(sketchClass); + } + + @Override + boolean isApplicable(AggregateCall aggCall) { + // FIXME: also check that args are: ?,?,1,0 - other cases are not supported + return !aggCall.isDistinct() && aggCall.getArgList().size() == 4 + && aggCall.getAggregation().getName().equalsIgnoreCase("percentile_cont") && !aggCall.hasFilter(); + } + + @Override + void rewrite(AggregateCall aggCall) { + RelDataType origType = aggregate.getRowType().getFieldList().get(newProjects.size()).getType(); + + Integer argIndex = aggCall.getArgList().get(1); + RexNode call = rexBuilder.makeInputRef(aggregate.getInput(), argIndex); + + RelDataType floatType = rexBuilder.getTypeFactory().createSqlType(SqlTypeName.FLOAT); + call = rexBuilder.makeCast(floatType, call); + newProjectsBelow.add(call); + + ArrayList newArgList = Lists.newArrayList(newProjectsBelow.size() - 1); + + SqlAggFunction aggFunction = (SqlAggFunction) getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH); + boolean distinct = false; + boolean approximate = true; + boolean ignoreNulls = true; + List argList = newArgList; + int filterArg = aggCall.filterArg; + RelCollation collation = aggCall.getCollation(); + RelDataType type = rexBuilder.deriveReturnType(aggFunction, Collections.emptyList()); + String name = aggFunction.getName(); + + AggregateCall newAgg = AggregateCall.create(aggFunction, distinct, approximate, ignoreNulls, argList, filterArg, + collation, type, name); + + Integer origFractionIdx = aggCall.getArgList().get(0); + RexNode fraction = getProject(aggregate.getInput()).getChildExps().get(origFractionIdx); + fraction = rexBuilder.makeCast(floatType, fraction); + + SqlOperator projectOperator = getSqlOperator(DataSketchesFunctions.GET_QUANTILE); + RexNode projRex = rexBuilder.makeInputRef(newAgg.getType(), newProjects.size()); + projRex = rexBuilder.makeCall(projectOperator, ImmutableList.of(projRex, fraction)); + projRex = rexBuilder.makeCast(origType, projRex); + + newAggCalls.add(newAgg); + newProjects.add(projRex); + + } + + } + + private Project getProject(RelNode input) { + if (input instanceof Project) { + return (Project) input; + } + if (input instanceof HepRelVertex) { + HepRelVertex hepRelVertex = (HepRelVertex) input; + return (Project) hepRelVertex.getCurrentRel(); + } + throw new RuntimeException("unexpected"); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 085de48..133cd01 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -237,7 +237,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRelFieldTrimmer; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRemoveGBYSemiJoinRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRemoveSqCountCheck; -import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRewriteCountDistinctToDataSketches; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRewriteToDataSketchesRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRulesRegistry; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSemiJoinRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortJoinReduceRule; @@ -1973,10 +1973,17 @@ if (!isMaterializedViewMaintenance() && conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_BI_ENABLED)) { // Rewrite to datasketches if enabled + Optional countDistinctSketchType = Optional.empty(); + Optional percentileContSketchType = Optional.empty(); if (conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_COUNTDISTINCT_ENABLED)) { - String sketchClass = conf.getVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_COUNT_DISTINCT_SKETCH); - generatePartialProgram(program, true, HepMatchOrder.TOP_DOWN, - new HiveRewriteCountDistinctToDataSketches(sketchClass)); + countDistinctSketchType= Optional.of(conf.getVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_COUNT_DISTINCT_SKETCH)); + } + if (conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_PERCENTILECONT_ENABLED)) { + percentileContSketchType = Optional.of(conf.getVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_PERCENTILECONT_SKETCH)); + } + if (countDistinctSketchType.isPresent() || percentileContSketchType.isPresent()) { + RelOptRule rule = new HiveRewriteToDataSketchesRule(countDistinctSketchType, percentileContSketchType); + generatePartialProgram(program, true, HepMatchOrder.TOP_DOWN, rule); } } // Run this optimization early, since it is expanding the operator pipeline. diff --git ql/src/test/queries/clientpositive/sketches_rewrite.q ql/src/test/queries/clientpositive/sketches_rewrite.q deleted file mode 100644 index 0420d62..0000000 --- ql/src/test/queries/clientpositive/sketches_rewrite.q +++ /dev/null @@ -1,19 +0,0 @@ ---! qt:transactional - -set hive.optimize.bi.enabled=true; - -create table sketch_input (id int, category char(1)) -STORED AS ORC -TBLPROPERTIES ('transactional'='true'); - -insert into table sketch_input values - (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'), - (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b') -; - --- see if rewrite happens -explain -select category, count(distinct id) from sketch_input group by category; - -select category, count(distinct id) from sketch_input group by category; - diff --git ql/src/test/queries/clientpositive/sketches_rewrite_count_distinct.q ql/src/test/queries/clientpositive/sketches_rewrite_count_distinct.q new file mode 100644 index 0000000..0420d62 --- /dev/null +++ ql/src/test/queries/clientpositive/sketches_rewrite_count_distinct.q @@ -0,0 +1,19 @@ +--! qt:transactional + +set hive.optimize.bi.enabled=true; + +create table sketch_input (id int, category char(1)) +STORED AS ORC +TBLPROPERTIES ('transactional'='true'); + +insert into table sketch_input values + (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'), + (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b') +; + +-- see if rewrite happens +explain +select category, count(distinct id) from sketch_input group by category; + +select category, count(distinct id) from sketch_input group by category; + diff --git ql/src/test/queries/clientpositive/sketches_rewrite_percentile_cont.q ql/src/test/queries/clientpositive/sketches_rewrite_percentile_cont.q new file mode 100644 index 0000000..38a8793 --- /dev/null +++ ql/src/test/queries/clientpositive/sketches_rewrite_percentile_cont.q @@ -0,0 +1,20 @@ +--! qt:transactional + + +create table sketch_input (id int, category char(1)) +STORED AS ORC +TBLPROPERTIES ('transactional'='true'); + +insert into table sketch_input values + (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'), + (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b') +; + +set hive.optimize.bi.enabled=true; + +-- see if rewrite happens +explain +select percentile_cont(0.2) within group(order by id) from sketch_input; + +select percentile_cont(0.2) within group(order by id) from sketch_input; + diff --git ql/src/test/results/clientpositive/llap/sketches_rewrite.q.out ql/src/test/results/clientpositive/llap/sketches_rewrite.q.out deleted file mode 100644 index dedcff9..0000000 --- ql/src/test/results/clientpositive/llap/sketches_rewrite.q.out +++ /dev/null @@ -1,110 +0,0 @@ -PREHOOK: query: create table sketch_input (id int, category char(1)) -STORED AS ORC -TBLPROPERTIES ('transactional'='true') -PREHOOK: type: CREATETABLE -PREHOOK: Output: database:default -PREHOOK: Output: default@sketch_input -POSTHOOK: query: create table sketch_input (id int, category char(1)) -STORED AS ORC -TBLPROPERTIES ('transactional'='true') -POSTHOOK: type: CREATETABLE -POSTHOOK: Output: database:default -POSTHOOK: Output: default@sketch_input -PREHOOK: query: insert into table sketch_input values - (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'), - (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b') -PREHOOK: type: QUERY -PREHOOK: Input: _dummy_database@_dummy_table -PREHOOK: Output: default@sketch_input -POSTHOOK: query: insert into table sketch_input values - (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'), - (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b') -POSTHOOK: type: QUERY -POSTHOOK: Input: _dummy_database@_dummy_table -POSTHOOK: Output: default@sketch_input -POSTHOOK: Lineage: sketch_input.category SCRIPT [] -POSTHOOK: Lineage: sketch_input.id SCRIPT [] -PREHOOK: query: explain -select category, count(distinct id) from sketch_input group by category -PREHOOK: type: QUERY -PREHOOK: Input: default@sketch_input -#### A masked pattern was here #### -POSTHOOK: query: explain -select category, count(distinct id) from sketch_input group by category -POSTHOOK: type: QUERY -POSTHOOK: Input: default@sketch_input -#### A masked pattern was here #### -STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 - -STAGE PLANS: - Stage: Stage-1 - Tez -#### A masked pattern was here #### - Edges: - Reducer 2 <- Map 1 (SIMPLE_EDGE) -#### A masked pattern was here #### - Vertices: - Map 1 - Map Operator Tree: - TableScan - alias: sketch_input - Statistics: Num rows: 22 Data size: 1958 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - expressions: id (type: int), category (type: char(1)) - outputColumnNames: id, category - Statistics: Num rows: 22 Data size: 1958 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - aggregations: ds_hll_sketch(id) - keys: category (type: char(1)) - minReductionHashAggr: 0.9090909 - mode: hash - outputColumnNames: _col0, _col1 - Statistics: Num rows: 2 Data size: 946 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: char(1)) - null sort order: z - sort order: + - Map-reduce partition columns: _col0 (type: char(1)) - Statistics: Num rows: 2 Data size: 946 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col1 (type: struct) - Execution mode: llap - LLAP IO: may be used (ACID table) - Reducer 2 - Execution mode: llap - Reduce Operator Tree: - Group By Operator - aggregations: ds_hll_sketch(VALUE._col0) - keys: KEY._col0 (type: char(1)) - mode: mergepartial - outputColumnNames: _col0, _col1 - Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - expressions: _col0 (type: char(1)), UDFToLong(ds_hll_estimate(_col1)) (type: bigint) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 2 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE - File Output Operator - compressed: false - Statistics: Num rows: 2 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - - Stage: Stage-0 - Fetch Operator - limit: -1 - Processor Tree: - ListSink - -PREHOOK: query: select category, count(distinct id) from sketch_input group by category -PREHOOK: type: QUERY -PREHOOK: Input: default@sketch_input -#### A masked pattern was here #### -POSTHOOK: query: select category, count(distinct id) from sketch_input group by category -POSTHOOK: type: QUERY -POSTHOOK: Input: default@sketch_input -#### A masked pattern was here #### -a 10 -b 10 diff --git ql/src/test/results/clientpositive/llap/sketches_rewrite_count_distinct.q.out ql/src/test/results/clientpositive/llap/sketches_rewrite_count_distinct.q.out new file mode 100644 index 0000000..8c556ef --- /dev/null +++ ql/src/test/results/clientpositive/llap/sketches_rewrite_count_distinct.q.out @@ -0,0 +1,110 @@ +PREHOOK: query: create table sketch_input (id int, category char(1)) +STORED AS ORC +TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@sketch_input +POSTHOOK: query: create table sketch_input (id int, category char(1)) +STORED AS ORC +TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@sketch_input +PREHOOK: query: insert into table sketch_input values + (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'), + (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@sketch_input +POSTHOOK: query: insert into table sketch_input values + (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'), + (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@sketch_input +POSTHOOK: Lineage: sketch_input.category SCRIPT [] +POSTHOOK: Lineage: sketch_input.id SCRIPT [] +PREHOOK: query: explain +select category, count(distinct id) from sketch_input group by category +PREHOOK: type: QUERY +PREHOOK: Input: default@sketch_input +#### A masked pattern was here #### +POSTHOOK: query: explain +select category, count(distinct id) from sketch_input group by category +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sketch_input +#### A masked pattern was here #### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: sketch_input + Statistics: Num rows: 22 Data size: 1958 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: id (type: int), category (type: char(1)) + outputColumnNames: id, category + Statistics: Num rows: 22 Data size: 1958 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: ds_hll_sketch(id) + keys: category (type: char(1)) + minReductionHashAggr: 0.9090909 + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 946 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: char(1)) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: char(1)) + Statistics: Num rows: 2 Data size: 946 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: struct) + Execution mode: llap + LLAP IO: may be used (ACID table) + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: ds_hll_sketch(VALUE._col0) + keys: KEY._col0 (type: char(1)) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: char(1)), UDFToLong(round(ds_hll_estimate(_col1))) (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select category, count(distinct id) from sketch_input group by category +PREHOOK: type: QUERY +PREHOOK: Input: default@sketch_input +#### A masked pattern was here #### +POSTHOOK: query: select category, count(distinct id) from sketch_input group by category +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sketch_input +#### A masked pattern was here #### +a 10 +b 10 diff --git ql/src/test/results/clientpositive/llap/sketches_rewrite_percentile_cont.q.out ql/src/test/results/clientpositive/llap/sketches_rewrite_percentile_cont.q.out new file mode 100644 index 0000000..8d7ad7f --- /dev/null +++ ql/src/test/results/clientpositive/llap/sketches_rewrite_percentile_cont.q.out @@ -0,0 +1,105 @@ +PREHOOK: query: create table sketch_input (id int, category char(1)) +STORED AS ORC +TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@sketch_input +POSTHOOK: query: create table sketch_input (id int, category char(1)) +STORED AS ORC +TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@sketch_input +PREHOOK: query: insert into table sketch_input values + (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'), + (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@sketch_input +POSTHOOK: query: insert into table sketch_input values + (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'), + (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@sketch_input +POSTHOOK: Lineage: sketch_input.category SCRIPT [] +POSTHOOK: Lineage: sketch_input.id SCRIPT [] +PREHOOK: query: explain +select percentile_cont(0.2) within group(order by id) from sketch_input +PREHOOK: type: QUERY +PREHOOK: Input: default@sketch_input +#### A masked pattern was here #### +POSTHOOK: query: explain +select percentile_cont(0.2) within group(order by id) from sketch_input +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sketch_input +#### A masked pattern was here #### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: sketch_input + Statistics: Num rows: 22 Data size: 88 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: UDFToFloat(id) (type: float) + outputColumnNames: _col0 + Statistics: Num rows: 22 Data size: 88 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: ds_kll_sketch(_col0) + minReductionHashAggr: 0.95454544 + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 144 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 144 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: binary) + Execution mode: llap + LLAP IO: may be used (ACID table) + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: ds_kll_sketch(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 144 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: UDFToDouble(ds_kll_quantile(_col0, 0.2)) (type: double) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select percentile_cont(0.2) within group(order by id) from sketch_input +PREHOOK: type: QUERY +PREHOOK: Input: default@sketch_input +#### A masked pattern was here #### +POSTHOOK: query: select percentile_cont(0.2) within group(order by id) from sketch_input +POSTHOOK: type: QUERY +POSTHOOK: Input: default@sketch_input +#### A masked pattern was here #### +4.0