diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java index ba46ff7..2395c0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java @@ -43,6 +43,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeImpl; import org.apache.calcite.rex.RexBuilder; @@ -287,10 +288,6 @@ private static RelNode createTableScan(Table viewTable) { else { fullyQualifiedTabName = viewTable.getTableName(); } - RelOptHiveTable optTable = new RelOptHiveTable(null, fullyQualifiedTabName, - rowType, viewTable, nonPartitionColumns, partitionColumns, new ArrayList(), - SessionState.get().getConf(), new HashMap(), - new HashMap(), new AtomicInteger()); RelNode tableRel; // 3. Build operator @@ -302,8 +299,18 @@ private static RelNode createTableScan(Table viewTable) { Set metrics = new HashSet<>(); List druidColTypes = new ArrayList<>(); List druidColNames = new ArrayList<>(); + //@NOTE this code is very similar to the code at org/apache/hadoop/hive/ql/parse/CalcitePlanner.java:2362 + //@TODO it will be nice to refactor it + RelDataTypeFactory dtFactory = cluster.getRexBuilder().getTypeFactory(); for (RelDataTypeField field : rowType.getFieldList()) { druidColTypes.add(field.getType()); + if (DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(field.getName())) { + // Druid's time column is always not null. + druidColTypes.add(dtFactory.createTypeWithNullability(field.getType(), false)); + } else { + druidColTypes.add(field.getType()); + } + druidColNames.add(field.getName()); if (field.getName().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) { // timestamp @@ -315,10 +322,14 @@ private static RelNode createTableScan(Table viewTable) { } metrics.add(field.getName()); } + rowType = dtFactory.createStructType(druidColTypes, druidColNames); // TODO: Default interval will be an Interval once Calcite 1.15.0 is released. // We will need to update the type of this list. List intervals = Arrays.asList(DruidTable.DEFAULT_INTERVAL); - + RelOptHiveTable optTable = new RelOptHiveTable(null, fullyQualifiedTabName, + rowType, viewTable, nonPartitionColumns, partitionColumns, new ArrayList(), + SessionState.get().getConf(), new HashMap(), + new HashMap(), new AtomicInteger()); DruidTable druidTable = new DruidTable(new DruidSchema(address, address, false), dataSource, RelDataTypeImpl.proto(rowType), metrics, DruidTable.DEFAULT_TIMESTAMP_COLUMN, intervals, null, null); @@ -328,6 +339,10 @@ private static RelNode createTableScan(Table viewTable) { optTable, druidTable, ImmutableList.of(scan)); } else { // Build Hive Table Scan Rel + RelOptHiveTable optTable = new RelOptHiveTable(null, fullyQualifiedTabName, + rowType, viewTable, nonPartitionColumns, partitionColumns, new ArrayList(), + SessionState.get().getConf(), new HashMap(), + new HashMap(), new AtomicInteger()); tableRel = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), optTable, viewTable.getTableName(), null, false, false); } @@ -393,7 +408,7 @@ private static TableType obtainTableType(Table tabMetaData) { } return TableType.NATIVE; } - + //@TODO this seems to be the same as org.apache.hadoop.hive.ql.parse.CalcitePlanner.TableType.DRUID do we really need both private enum TableType { DRUID, NATIVE diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveReduceExpressionsRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveReduceExpressionsRule.java index 9a5d8e2..c7ed318 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveReduceExpressionsRule.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveReduceExpressionsRule.java @@ -16,22 +16,30 @@ */ package org.apache.hadoop.hive.ql.optimizer.calcite.rules; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptPredicateList; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.rules.ReduceExpressionsRule; +import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexExecutor; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexSimplify; import org.apache.calcite.rex.RexUtil; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.Util; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; @@ -103,6 +111,33 @@ public FilterReduceExpressionsRule(Class filterClass, super(filterClass, relBuilderFactory, "ReduceExpressionsRule(Filter)"); } + protected static boolean reduceExpressions(RelNode rel, List expList, + RelOptPredicateList predicates, boolean unknownAsFalse, boolean matchNullability) { + final RelOptCluster cluster = rel.getCluster(); + final RexBuilder rexBuilder = cluster.getRexBuilder(); + final RexExecutor executor = + Util.first(cluster.getPlanner().getExecutor(), RexUtil.EXECUTOR); + final RexSimplify simplify = + new RexSimplify(rexBuilder, unknownAsFalse, executor); + + // Simplify predicates in place + boolean reduced = reduceExpressionsInternal(rel, simplify, expList, + predicates); + + final ExprSimplifier simplifier = new ExprSimplifier(simplify, matchNullability, unknownAsFalse); + boolean simplified = false; + for (int i = 0; i < expList.size(); i++) { + RexNode expr2 = simplifier.apply(expList.get(i)); + if (!expr2.toString().equals(expList.get(i).toString())) { + expList.remove(i); + expList.add(i, expr2); + simplified = true; + } + } + + return reduced || simplified; + } + @Override public void onMatch(RelOptRuleCall call) { final Filter filter = call.rel(0); final List expList = @@ -112,7 +147,7 @@ public FilterReduceExpressionsRule(Class filterClass, final RelMetadataQuery mq = call.getMetadataQuery(); final RelOptPredicateList predicates = mq.getPulledUpPredicates(filter.getInput()); - if (reduceExpressions(filter, expList, predicates, true)) { + if (reduceExpressions(filter, expList, predicates, true, false)) { assert expList.size() == 1; newConditionExp = expList.get(0); reduced = true; @@ -223,6 +258,55 @@ private void reduceNotNullableFilter( } } } + + private static class ExprSimplifier extends RexShuttle { + private final RexSimplify simplify; + private final Map unknownAsFalseMap; + private final boolean matchNullability; + private final boolean unknownAsFalse; + + public ExprSimplifier(RexSimplify simplify, boolean matchNullability, boolean unknownAsFalse) { + this.simplify = simplify; + this.unknownAsFalseMap = new HashMap<>(); + this.matchNullability = matchNullability; + this.unknownAsFalse = unknownAsFalse; + } + + @Override public RexNode visitCall(RexCall call) { + boolean unknownAsFalseCall = unknownAsFalse; + if (unknownAsFalseCall) { + switch (call.getKind()) { + case AND: + case CASE: + final Boolean b = this.unknownAsFalseMap.get(call); + if (b == null) { + // Top operator + unknownAsFalseCall = true; + } else { + unknownAsFalseCall = b; + } + break; + default: + unknownAsFalseCall = false; + } + for (RexNode operand : call.operands) { + this.unknownAsFalseMap.put(operand, unknownAsFalseCall); + } + } + RexNode node = super.visitCall(call); + RexNode simplifiedNode = + simplify.withUnknownAsFalse(unknownAsFalseCall) + .simplify(node); + if (node == simplifiedNode) { + return node; + } + if (simplifiedNode.getType().equals(call.getType())) { + return simplifiedNode; + } + return simplify.rexBuilder.makeCast(call.getType(), simplifiedNode, matchNullability); + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 23b93cd..3c8e49e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -2390,18 +2390,23 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc else { fullyQualifiedTabName = tabMetaData.getTableName(); } - RelOptHiveTable optTable = new RelOptHiveTable(relOptSchema, fullyQualifiedTabName, - rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, - partitionCache, colStatsCache, noColsMissingStats); + // Build Druid query String address = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); String dataSource = tabMetaData.getParameters().get(Constants.DRUID_DATA_SOURCE); Set metrics = new HashSet<>(); + RexBuilder rexBuilder = cluster.getRexBuilder(); + RelDataTypeFactory dtFactory = rexBuilder.getTypeFactory(); List druidColTypes = new ArrayList<>(); List druidColNames = new ArrayList<>(); for (RelDataTypeField field : rowType.getFieldList()) { - druidColTypes.add(field.getType()); + if (DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(field.getName())) { + // Druid's time column is always not null. + druidColTypes.add(dtFactory.createTypeWithNullability(field.getType(), false)); + } else { + druidColTypes.add(field.getType()); + } druidColNames.add(field.getName()); if (field.getName().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) { // timestamp @@ -2413,6 +2418,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc } metrics.add(field.getName()); } + rowType = dtFactory.createStructType(druidColTypes, druidColNames); // TODO: Default interval will be an Interval once Calcite 1.15.0 is released. // We will need to update the type of this list. List intervals = Arrays.asList(DruidTable.DEFAULT_INTERVAL); @@ -2420,6 +2426,9 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc DruidTable druidTable = new DruidTable(new DruidSchema(address, address, false), dataSource, RelDataTypeImpl.proto(rowType), metrics, DruidTable.DEFAULT_TIMESTAMP_COLUMN, intervals, null, null); + RelOptHiveTable optTable = new RelOptHiveTable(relOptSchema, fullyQualifiedTabName, + rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, + partitionCache, colStatsCache, noColsMissingStats); final TableScan scan = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), optTable, null == tableAlias ? tabMetaData.getTableName() : tableAlias, getAliasId(tableAlias, qb), HiveConf.getBoolVar(conf, diff --git ql/src/test/queries/clientpositive/druid_timeseries.q ql/src/test/queries/clientpositive/druid_timeseries.q index f784f26..0a1ab1e 100644 --- ql/src/test/queries/clientpositive/druid_timeseries.q +++ ql/src/test/queries/clientpositive/druid_timeseries.q @@ -92,3 +92,12 @@ FROM ) subq WHERE subq.h BETWEEN CAST('2010-01-01 00:00:00' AS TIMESTAMP WITH LOCAL TIME ZONE) AND CAST('2014-01-01 00:00:00' AS TIMESTAMP WITH LOCAL TIME ZONE); + +-- Simplification of count(__time) as count(*) since time column is not null +EXPLAIN SELECT count(`__time`) from druid_table_1; + +EXPLAIN SELECT count(`__time`) from druid_table_1 where `__time` <= '2010-01-01 00:00:00 UTC'; + +EXPLAIN SELECT count(`__time`) from druid_table_1 where `__time` >= '2010-01-01 00:00:00'; + +EXPLAIN SELECT count(`__time`) from druid_table_1 where `__time` <= '2010-01-01 00:00:00' OR `__time` <= '2012-03-01 00:00:00'; diff --git ql/src/test/results/clientpositive/druid_timeseries.q.out ql/src/test/results/clientpositive/druid_timeseries.q.out index 330c068..491f482 100644 --- ql/src/test/results/clientpositive/druid_timeseries.q.out +++ ql/src/test/results/clientpositive/druid_timeseries.q.out @@ -537,3 +537,99 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: EXPLAIN SELECT count(`__time`) from druid_table_1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT count(`__time`) from druid_table_1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table_1 + properties: + druid.query.json {"queryType":"timeseries","dataSource":"wikipedia","descending":false,"granularity":"all","aggregations":[{"type":"count","name":"$f0"}],"intervals":["1900-01-01T00:00:00.000/3000-01-01T00:00:00.000"],"context":{"skipEmptyBuckets":true}} + druid.query.type timeseries + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: $f0 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + ListSink + +PREHOOK: query: EXPLAIN SELECT count(`__time`) from druid_table_1 where `__time` <= '2010-01-01 00:00:00 UTC' +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT count(`__time`) from druid_table_1 where `__time` <= '2010-01-01 00:00:00 UTC' +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table_1 + properties: + druid.query.json {"queryType":"timeseries","dataSource":"wikipedia","descending":false,"granularity":"all","aggregations":[{"type":"count","name":"$f0"}],"intervals":["1900-01-01T00:00:00.000/2010-01-01T00:00:00.001"],"context":{"skipEmptyBuckets":true}} + druid.query.type timeseries + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: $f0 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + ListSink + +PREHOOK: query: EXPLAIN SELECT count(`__time`) from druid_table_1 where `__time` >= '2010-01-01 00:00:00' +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT count(`__time`) from druid_table_1 where `__time` >= '2010-01-01 00:00:00' +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table_1 + properties: + druid.query.json {"queryType":"timeseries","dataSource":"wikipedia","descending":false,"granularity":"all","aggregations":[{"type":"count","name":"$f0"}],"intervals":["2010-01-01T08:00:00.000/3000-01-01T00:00:00.000"],"context":{"skipEmptyBuckets":true}} + druid.query.type timeseries + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: $f0 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + ListSink + +PREHOOK: query: EXPLAIN SELECT count(`__time`) from druid_table_1 where `__time` <= '2010-01-01 00:00:00' OR `__time` <= '2012-03-01 00:00:00' +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT count(`__time`) from druid_table_1 where `__time` <= '2010-01-01 00:00:00' OR `__time` <= '2012-03-01 00:00:00' +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: druid_table_1 + properties: + druid.query.json {"queryType":"timeseries","dataSource":"wikipedia","descending":false,"granularity":"all","aggregations":[{"type":"count","name":"$f0"}],"intervals":["1900-01-01T00:00:00.000/2012-03-01T08:00:00.001"],"context":{"skipEmptyBuckets":true}} + druid.query.type timeseries + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: $f0 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + ListSink +