From b1a3139d01b6d68e43c9ebd2ba266fb835032ca1 Mon Sep 17 00:00:00 2001 From: xiejialong Date: Wed, 4 Jan 2023 11:20:10 +0800 Subject: [PATCH] =?UTF-8?q?segment=E8=BF=87=E6=BB=A4=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E5=88=86=E5=8C=BA=E8=BF=87=E6=BB=A4=E4=BB=8E?= =?UTF-8?q?=E8=80=8C=E6=8F=90=E5=8D=87cuboid=E7=9A=84=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E7=B2=92=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/kylin/common/KylinConfigBase.java | 13 + .../apache/kylin/storage/StorageContext.java | 8 + .../gtrecord/GTCubeStorageQueryBase.java | 14 +- .../spark/sql/KylinDataFrameManager.scala | 6 +- .../sql/execution/datasource/FilePruner.scala | 26 +- .../query/runtime/plans/AggregatePlan.scala | 2 + .../query/runtime/plans/TableScanPlan.scala | 11 +- .../storage/spark/HadoopFileStorageQuery.java | 27 +- .../kylin/query/relnode/OLAPAggregateRel.java | 2 +- .../kylin/query/relnode/OLAPContext.java | 6 + .../kylin/query/relnode/OLAPFilterRel.java | 864 +++++++++++++++++- .../kylin/query/relnode/OLAPJoinRel.java | 2 +- .../kylin/query/relnode/OLAPLimitRel.java | 2 +- .../query/relnode/OLAPNonEquiJoinRel.java | 2 +- .../kylin/query/relnode/OLAPProjectRel.java | 2 +- .../apache/kylin/query/relnode/OLAPRel.java | 4 +- .../kylin/query/relnode/OLAPSortRel.java | 2 +- .../kylin/query/relnode/OLAPTableScan.java | 2 +- .../kylin/query/relnode/OLAPUnionRel.java | 2 +- .../kylin/query/relnode/OLAPValuesRel.java | 2 +- .../kylin/query/relnode/OLAPWindowRel.java | 2 +- .../kylin/rest/service/QueryService.java | 10 +- 22 files changed, 971 insertions(+), 40 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index fc46325cb5..8019447be0 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2818,4 +2818,17 @@ public abstract class KylinConfigBase implements Serializable { public boolean isEnabledNoAggQuery() { return Boolean.parseBoolean(getOptional("kylin.query.enable-no-aggregate-query", FALSE)); } + + /*** + * For queries that contain partition column in filter, but don't contain partition column in aggregation groups. + * Take the following query as an example (part_dt is partition column): + * select ops_user_id, sum(price) from kylin_sales where ops_user_id = 'ADMIN' and part_dt > '2012-01-03' + * KYLIN will choose cuboid: [ops_user_id, part_dt], we can change this query into tow parts: + * 1. for segments whose data's part_dt column is all greater than 2012-01-03, we can choose cuboid: [ops_user_id] + * 2. for segments whose data's part_dt column is only partly greater than 2012-01-03, we can choose cuboid: [ops_user_id, part_dt] + * With this optimization on, large queries(long time span) can reduce the amount of data reading and computing + ***/ + public boolean removePartitionDimensionDynamically() { + return Boolean.valueOf(getOptional("kylin.query.remove.partition.dimension.dynamically", "false")); + }; } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index 6d1e2a21ab..f81636e773 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -18,6 +18,8 @@ package org.apache.kylin.storage; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.kylin.common.StorageURL; @@ -57,6 +59,7 @@ public class StorageContext { private IStorageQuery storageQuery; private AtomicLong processedRowCount = new AtomicLong(); private Cuboid cuboid; + private List cuboidList = new ArrayList<>(); private CuboidToGridTableMapping mapping; private boolean partialResultReturned = false; @@ -185,12 +188,17 @@ public class StorageContext { public void setCuboid(Cuboid c) { cuboid = c; + cuboidList.add(c); } public Cuboid getCuboid() { return cuboid; } + public List getCuboidList() { + return cuboidList; + } + public CuboidToGridTableMapping getMapping() { return mapping; } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 35293d81a0..598df6f956 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -187,7 +187,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } protected void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection dimensions, - Collection metrics) { + Collection metrics, Set filterCols) { for (FunctionDesc func : sqlDigest.aggregations) { if (!func.isDimensionAsMetric() && !FunctionDesc.FUNC_GROUPING.equalsIgnoreCase(func.getExpression())) { // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision @@ -198,15 +198,25 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { for (TblColRef column : sqlDigest.allColumns) { // skip measure columns if ((sqlDigest.metricColumns.contains(column) || sqlDigest.rtMetricColumns.contains(column)) - && !(sqlDigest.groupbyColumns.contains(column) || sqlDigest.filterColumns.contains(column) + && !(sqlDigest.groupbyColumns.contains(column) || filterCols.contains(column) || sqlDigest.rtDimensionColumns.contains(column))) { continue; } + //skip partition columns if possible + if (!sqlDigest.groupbyColumns.contains(column) && !filterCols.contains(column)) { + continue; + } + dimensions.add(column); } } + protected void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection dimensions, + Collection metrics) { + buildDimensionsAndMetrics(sqlDigest, dimensions, metrics, sqlDigest.filterColumns); + } + private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) { for (MeasureDesc measure : cubeDesc.getMeasures()) { if (measure.getFunction().equals(aggrFunc)) diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala index 153786cd33..9f062b7506 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql -import org.apache.kylin.cube.CubeInstance +import org.apache.kylin.cube.{CubeInstance, CubeSegment} import org.apache.kylin.cube.cuboid.Cuboid import org.apache.spark.sql.types.StructType import org.apache.spark.sql.execution.datasource.FilePruner @@ -65,11 +65,11 @@ class KylinDataFrameManager(sparkSession: SparkSession) { this } - def cuboidTable(cubeInstance: CubeInstance, layout: Cuboid): DataFrame = { + def cuboidTable(cubeInstance: CubeInstance, layout: Cuboid, cubeSegment: List[CubeSegment] = null): DataFrame = { option("project", cubeInstance.getProject) option("cubeId", cubeInstance.getUuid) option("cuboidId", layout.getId) - val indexCatalog = new FilePruner(cubeInstance, layout, sparkSession, options = extraOptions.toMap) + val indexCatalog = new FilePruner(cubeInstance, layout, sparkSession, options = extraOptions.toMap, cubeSegment = cubeSegment) sparkSession.baseRelationToDataFrame( HadoopFsRelation( indexCatalog, diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index 34fc967c29..223a7c4c4a 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp} import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.kylin.common.util.DateFormat import org.apache.kylin.cube.cuboid.Cuboid -import org.apache.kylin.cube.CubeInstance +import org.apache.kylin.cube.{CubeInstance, CubeSegment} import org.apache.kylin.engine.spark.metadata.cube.PathManager import org.apache.kylin.engine.spark.metadata.MetadataConverter import org.apache.kylin.metadata.model.{PartitionDesc, SegmentStatusEnum} @@ -73,7 +73,8 @@ case class ShardSpec(numShards: Int, class FilePruner(cubeInstance: CubeInstance, cuboid: Cuboid, val session: SparkSession, - val options: Map[String, String]) + val options: Map[String, String], + val cubeSegment: List[CubeSegment] = null) extends FileIndex with ResetShufflePartition with Logging { val MAX_SHARDING_SIZE_PER_TASK: Long = @@ -241,17 +242,24 @@ class FilePruner(cubeInstance: CubeInstance, require(isResolved) val startTime = System.nanoTime - val timePartitionFilters = getSegmentFilter(dataFilters, timePartitionColumn) - logInfo(s"Applying time partition filters: ${timePartitionFilters.mkString(",")}") - var startT = System.currentTimeMillis() val fsc = ShardFileStatusCache.getFileStatusCache(session) logInfo(s"Get file status cache: ${System.currentTimeMillis() - startT}") - // segment pruning - var selected = afterPruning("segment", timePartitionFilters, segmentDirs) { - pruneSegments - } + var selected: Seq[SegmentDirectory] = + if (!cubeSegment.isEmpty) { + cubeSegment + .filter(_.getStatus.equals(SegmentStatusEnum.READY)).map(seg => { + SegmentDirectory(seg.getName, seg.getStorageLocationIdentifier, Nil) + }) + } else { + val timePartitionFilters = getSegmentFilter(dataFilters, timePartitionColumn) + logInfo(s"Applying time partition filters: ${timePartitionFilters.mkString(",")}") + // segment pruning + afterPruning("segment", timePartitionFilters, segmentDirs) { + pruneSegments + } + } startT = System.currentTimeMillis() // fetch segment directories info in parallel diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala index 4354b5db78..b86a8c7216 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala @@ -243,6 +243,8 @@ object AggregatePlan extends LogEx { if (olapContext == null || olapContext.realization == null) return false if (!olapContext.realization.getConfig.needReplaceAggWhenExactlyMatched) return false + if (olapContext.skipReplaceAggWhenExactlyMatched) return false; + val cuboid = olapContext.storageContext.getCuboid if (cuboid == null) return false if (olapContext.hasJoin) return false diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/TableScanPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/TableScanPlan.scala index e60232a003..8e501e38a6 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/TableScanPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/TableScanPlan.scala @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import org.apache.calcite.DataContext import org.apache.kylin.common.QueryContextFacade -import org.apache.kylin.cube.CubeInstance +import org.apache.kylin.cube.{CubeInstance, CubeSegment} import org.apache.kylin.metadata.model._ import org.apache.kylin.metadata.tuple.TupleInfo import org.apache.kylin.query.SchemaProcessor @@ -64,9 +64,10 @@ object TableScanPlan extends LogEx { olapContext.resetSQLDigest() val query = new HadoopFileStorageQuery(cubeInstance) val returnTupleInfo = olapContext.returnTupleInfo - val request = query.getStorageQueryRequest( + val requestAndFilter = query.getStorageQueryRequest( olapContext, returnTupleInfo) + val request = requestAndFilter._1 val cuboid = request.getCuboid val gridTableMapping = cuboid.getCuboidToGridTableMapping @@ -77,7 +78,11 @@ object TableScanPlan extends LogEx { import org.apache.kylin.query.implicits.implicits._ var df = SparderContext.getSparkSession.kylin .format("parquet") - .cuboidTable(cubeInstance, cuboid) + .cuboidTable(cubeInstance, cuboid, + if(requestAndFilter._2 == null) { + List[CubeSegment]() + } else + requestAndFilter._2.preCalculatedSegment.asScala.toList) .toDF(schemaNames: _*) // may have multi TopN measures. val topNMeasureIndexes = df.schema.fields.map(_.dataType).zipWithIndex.filter(_._1.isInstanceOf[ArrayType]).map(_._2) diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/storage/spark/HadoopFileStorageQuery.java b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/storage/spark/HadoopFileStorageQuery.java index 5ab9fc5a6f..c5a37acdfd 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/storage/spark/HadoopFileStorageQuery.java +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/storage/spark/HadoopFileStorageQuery.java @@ -19,6 +19,7 @@ package org.apache.kylin.storage.spark; import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.query.relnode.OLAPFilterRel; import org.apache.kylin.shaded.com.google.common.collect.Sets; import java.util.Collection; import java.util.LinkedHashSet; @@ -36,6 +37,7 @@ import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase; import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Tuple2; public class HadoopFileStorageQuery extends GTCubeStorageQueryBase { private static final Logger log = LoggerFactory.getLogger(HadoopFileStorageQuery.class); @@ -49,8 +51,8 @@ public class HadoopFileStorageQuery extends GTCubeStorageQueryBase { throw new UnsupportedOperationException("Unsupported getGTStorage."); } - public GTCubeStorageQueryRequest getStorageQueryRequest(OLAPContext olapContext, - TupleInfo returnTupleInfo) { + public Tuple2 getStorageQueryRequest(OLAPContext olapContext, + TupleInfo returnTupleInfo) { StorageContext context = olapContext.storageContext; SQLDigest sqlDigest = olapContext.getSQLDigest(); context.setStorageQuery(this); @@ -62,12 +64,25 @@ public class HadoopFileStorageQuery extends GTCubeStorageQueryBase { notifyBeforeStorageQuery(sqlDigest); Collection groups = sqlDigest.groupbyColumns; - TupleFilter filter = sqlDigest.filter; + + + Set filterCols = sqlDigest.filterColumns; + olapContext.skipReplaceAggWhenExactlyMatched = false; + OLAPFilterRel rel = null; + if (null != olapContext.splitFilters && olapContext.splitFilters.size() >= 1) { + rel = olapContext.splitFilters.get(0); + olapContext.splitFilters.remove(0); + if (!rel.preCalculatedSegment.isEmpty()) { + filterCols = rel.filterColRefs; + olapContext.skipReplaceAggWhenExactlyMatched = true; + } + } // build dimension & metrics Set dimensions = new LinkedHashSet<>(); Set metrics = new LinkedHashSet<>(); - buildDimensionsAndMetrics(sqlDigest, dimensions, metrics); + + buildDimensionsAndMetrics(sqlDigest, dimensions, metrics, filterCols); // all dimensions = groups + other(like filter) dimensions Set otherDims = Sets.newHashSet(dimensions); @@ -86,7 +101,7 @@ public class HadoopFileStorageQuery extends GTCubeStorageQueryBase { Cuboid cuboid = findCuboid(cubeInstance, dimensionsD, metrics); log.info("For OLAPContext {}, need cuboid {}, hit cuboid {}, level diff is {}.", olapContext.id, cuboid.getInputID() , cuboid.getId(), Long.bitCount(cuboid.getInputID() ^ cuboid.getId())); context.setCuboid(cuboid); - return new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, null, null, null, - metrics, null, null, null, context); + return new Tuple2(new GTCubeStorageQueryRequest(cuboid, dimensionsD, groupsD, null, null, null, + metrics, null, null, null, context), rel); } } diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java index 7e2483849e..7e17553428 100755 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java @@ -404,7 +404,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { } @Override - public void implementRewrite(RewriteImplementor implementor) { + public void implementRewrite(RelNode parent, RewriteImplementor implementor) { // only rewrite the innermost aggregation if (needRewrite()) { translateAggregation(); diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java index 9fa447ab44..a9881a4667 100755 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java @@ -71,6 +71,8 @@ public class OLAPContext { static final InternalThreadLocal> _localContexts = new InternalThreadLocal<>(); + public boolean removePartitionFilter = false; + public static void setParameters(Map parameters) { _localPrarameters.set(parameters); } @@ -162,6 +164,10 @@ public class OLAPContext { List sortColumns; List sortOrders; + public List splitFilters = new ArrayList<>(); + public boolean skipReplaceAggWhenExactlyMatched = false; + + // rewrite info public Map rewriteFields = new HashMap<>(); diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java index bfd6c4ddf0..33ebabd454 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java @@ -18,15 +18,30 @@ package org.apache.kylin.query.relnode; +import java.sql.Timestamp; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import com.google.common.collect.ImmutableSet; import org.apache.calcite.adapter.enumerable.EnumerableCalc; import org.apache.calcite.adapter.enumerable.EnumerableConvention; import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.avatica.util.TimeUnitRange; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelTrait; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; @@ -35,27 +50,64 @@ import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexLocalRef; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexPatternFieldRef; import org.apache.calcite.rex.RexProgram; import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.rex.RexRangeRef; +import org.apache.calcite.rex.RexSubQuery; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexVisitor; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.util.NlsString; +import org.apache.commons.lang3.time.FastDateFormat; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.metadata.filter.FilterOptimizeTransformer; import org.apache.kylin.metadata.filter.LogicalTupleFilter; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.PartitionDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.IRealization; +import org.apache.kylin.metadata.realization.SQLDigest; import org.apache.kylin.query.relnode.visitor.TupleFilterVisitor; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; /** */ public class OLAPFilterRel extends Filter implements OLAPRel { + public static final int ONE_DAY_MS = 86400 * 1000; + private static final Set DAY_FORMATE = ImmutableSet.of("yyyyMMdd", "yyyy-MM-dd"); + ColumnRowType columnRowType; OLAPContext context; boolean autoJustTimezone = KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone().length() > 0; + boolean doOptimizePartition = true; + public Set filterColRefs = new HashSet<>(); + public List preCalculatedSegment = new ArrayList<>(); + private FastDateFormat format; + private RexBuilder builder = new RexBuilder(new JavaTypeFactoryImpl()); + private InputRefVisitor inputRefVisitor = new InputRefVisitor(); public OLAPFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) { super(cluster, traits, child, condition); @@ -164,13 +216,152 @@ public class OLAPFilterRel extends Filter implements OLAPRel { } @Override - public void implementRewrite(RewriteImplementor implementor) { + public void implementRewrite(RelNode parent, RewriteImplementor implementor) { + boolean needOptimizeFilter = false; + boolean canOptimizeFilter = true; + List querySegments = null; + SegmentFilter segmentFilter = null; + + if (doOptimizePartition && checkQueryCanOpt(this.getContext().realization)) { + try { + List columnDescs = null; + if (this.getInput() instanceof OLAPTableScan) { + OLAPTableScan olapTableScan = (OLAPTableScan) this.getInput(); + columnDescs = olapTableScan.olapTable.getSourceColumns(); + } else { + canOptimizeFilter = false; + } + //partition column index + int parIdx = getParIdx(columnDescs); + segmentFilter = extractSegmentFilter((RexCall) this.getCondition(), parIdx); + if (canOptimizeFilter && segmentFilter.segmentFilter != null) { + PartitionDesc partitionDesc = this.getContext().realization.getModel().getPartitionDesc(); + this.format = DateFormat.getDateFormat(partitionDesc.getPartitionDateFormat()); + querySegments = calSegments(segmentFilter.segmentFilter, + (CubeInstance) this.getContext().realization); + for (DetailedSegments querySegment : querySegments) { + if (querySegment.fullyUsed()) { + needOptimizeFilter = true; + break; + } + } + } + } catch (Exception e) { + logger.warn("failed to remove partition dimension dynamically will fall back", e); + canOptimizeFilter = false; + } + } + + if (canOptimizeFilter && needOptimizeFilter) { + /** + * first是全匹配的,不需要分区过滤 + * second是需要进行分区过滤 + */ + Pair splits = generateSplitFilters(querySegments, segmentFilter); + rewriteNodeWithPartitonSplit(parent, implementor, querySegments, splits); + } else { + originFilterRewrite(implementor); + } + } + + private boolean checkQueryCanOpt(IRealization realization) { + try { + if (realization == null || realization.getModel() == null + || !realization.getConfig().removePartitionDimensionDynamically()) + return false; + final PartitionDesc partitionDesc = realization.getModel().getPartitionDesc(); + if (partitionDesc != null && partitionDesc.getPartitionDateColumnRef() != null + && partitionDesc.getPartitionTimeColumnRef() == null) { + final String partitionDateFormat = partitionDesc.getPartitionDateFormat(); + if (DAY_FORMATE.contains(partitionDateFormat)) { + SQLDigest sqlDigest = context.sqlDigest; + CubeInstance cubeInstance = (CubeInstance) context.realization; + TblColRef partitionCol = cubeInstance.getModel().getPartitionDesc().getPartitionDateColumnRef(); + return !sqlDigest.groupbyColumns.contains(partitionCol) + && sqlDigest.filterColumns.contains(partitionCol); + } + } + }catch (Exception e){ + logger.warn("check filter optimize fail {}", e); + return false; + } + return false; + } + + private int getParIdx(List columnDescs) { + int parIdx = 0; + CubeInstance realization = (CubeInstance) this.getContext().realization; + for (ColumnDesc col : columnDescs) { + if (col.equals(realization.getModel().getPartitionDesc().getPartitionDateColumnRef().getColumnDesc())) { + break; + } + parIdx++; + } + return parIdx; + } + + private void originFilterRewrite(RewriteImplementor implementor) { implementor.visitChild(this, getInput()); this.rowType = this.deriveRowType(); this.columnRowType = buildColumnRowType(); } + private void rewriteNodeWithPartitonSplit(RelNode parent, RewriteImplementor implementor, + List segments, Pair splits) { + OLAPFilterRel fullMatchedRel = null; + OLAPFilterRel partialMatchedRel = null; + if (splits.getFirst() != null) { //full matched filter is not empty, we need to split filter + fullMatchedRel = (OLAPFilterRel) this.copy(traitSet, this.getInput(), splits.getFirst()); + if (null != context.filterColumns) { + fullMatchedRel.filterColRefs.addAll(context.filterColumns); + } + fullMatchedRel.context = this.getContext(); + fullMatchedRel.doOptimizePartition = false; + + List fullyUsed = new LinkedList<>(); + for (DetailedSegments querySegment : segments) { + if (querySegment.fullyUsed()) { + fullyUsed.add(querySegment.segment); + } + } + fullMatchedRel.preCalculatedSegment = fullyUsed; + fullMatchedRel.filterColRefs.addAll(context.filterColumns); + fullMatchedRel.filterColRefs + .remove(context.realization.getModel().getPartitionDesc().getPartitionDateColumnRef()); + } + if (splits.getSecond() != null) { + partialMatchedRel = (OLAPFilterRel) this.copy(traitSet, this.getInput(), ((RexCall) this.getCondition()) + .clone(this.getCondition().getType(), Lists.newArrayList(splits.getSecond()))); + partialMatchedRel.context = this.getContext(); + partialMatchedRel.doOptimizePartition = false; + } + + int i = 0; + for (; i < parent.getInputs().size(); i++) { + if (parent.getInputs().get(i).getId() == this.getId()) { + break; + } + } + logger.info("Filter rewrite :\n all matched:{} ,segment:{} .\n partition filter {} ", + RelOptUtil.toString(fullMatchedRel), fullMatchedRel.preCalculatedSegment, + RelOptUtil.toString(partialMatchedRel)); + + if (partialMatchedRel == null) { + RelNode toReplace = fullMatchedRel; + parent.replaceInput(i, toReplace); + this.context.splitFilters.add(fullMatchedRel); + implementor.visitChild(parent, toReplace); + } else { + OLAPUnionRel rel = new OLAPUnionRel(this.getCluster(), this.getTraitSet(), + Lists.newArrayList(fullMatchedRel, partialMatchedRel), true); + parent.replaceInput(i, rel); + this.context.splitFilters.add(fullMatchedRel); + this.context.splitFilters.add(partialMatchedRel); + implementor.visitChild(parent, rel); + } + } + @Override public OLAPContext getContext() { return context; @@ -199,4 +390,675 @@ public class OLAPFilterRel extends Filter implements OLAPRel { return super.explainTerms(pw).item("ctx", context == null ? "" : String.valueOf(context.id) + "@" + context.realization); } + + private List calSegments(RexCall call, CubeInstance cubeInstance) throws Exception { + List segments = cubeInstance.getSegments(SegmentStatusEnum.READY).stream() + .map(seg -> new DetailedSegments(seg, false)).collect(Collectors.toList()); + switch (call.getKind()) { + case AND: { + List querySegments = null; + for (RexNode operand : call.getOperands()) { + List childSegments = calSegments((RexCall) operand, cubeInstance); + if (querySegments == null) { + querySegments = childSegments; + } else { + querySegments = intersectSegments(querySegments, childSegments); + } + } + return querySegments == null ? new LinkedList<>() : querySegments; + } + case OR: { + List querySegments = new LinkedList<>(); + for (RexNode operand : call.getOperands()) { + List childSegments = calSegments((RexCall) operand, cubeInstance); + querySegments = unionSegments(querySegments, childSegments); + } + return querySegments; + } + case EQUALS: { + long time = getTimeMillisFromLiteral(call.getOperands().get(1)); + LinkedList realQuerySegments = new LinkedList<>(); + segments.forEach(seg -> { + if (seg.start <= time && time < seg.end) { + DetailedSegments partial = new DetailedSegments(seg.segment, false); + partial.addSegmentEquals(time); + realQuerySegments.add(partial); + } + }); + return realQuerySegments; + } + case GREATER_THAN: { + long time = getTimeMillisFromLiteral(call.getOperands().get(1)); + LinkedList realQuerySegments = new LinkedList<>(); + segments.forEach(seg -> { + if (seg.start > time) { + realQuerySegments.add(new DetailedSegments(seg.segment, true)); + } else if (seg.start <= time && (seg.end - seg.segmentUnit) > time) { + DetailedSegments partial = new DetailedSegments(seg.segment, false); + partial.addSegmentGreaterThan(time); + realQuerySegments.add(partial); + } + }); + return realQuerySegments; + } + case GREATER_THAN_OR_EQUAL: { + long time = getTimeMillisFromLiteral(call.getOperands().get(1)); + LinkedList realQuerySegments = new LinkedList<>(); + segments.forEach(seg -> { + if (seg.start >= time) { + realQuerySegments.add(new DetailedSegments(seg.segment, true)); + } else if (seg.start <= time && seg.end > time) { + DetailedSegments partial = new DetailedSegments(seg.segment, false); + partial.addSegmentGreaterOrEqualThan(time); + realQuerySegments.add(partial); + } + }); + return realQuerySegments; + } + case LESS_THAN: { + long time = getTimeMillisFromLiteral(call.getOperands().get(1)); + LinkedList realQuerySegments = new LinkedList<>(); + segments.forEach(seg -> { + if (seg.end <= time) { + realQuerySegments.add(new DetailedSegments(seg.segment, true)); + } else if (seg.start < time && seg.end > time) { + DetailedSegments partial = new DetailedSegments(seg.segment, false); + partial.addSegmentLessThan(time); + realQuerySegments.add(partial); + } + }); + return realQuerySegments; + } + case LESS_THAN_OR_EQUAL: { + long time = getTimeMillisFromLiteral(call.getOperands().get(1)); + LinkedList realQuerySegments = new LinkedList<>(); + segments.forEach(seg -> { + if (seg.end - ONE_DAY_MS <= time) { + realQuerySegments.add(new DetailedSegments(seg.segment, true)); + } else if (seg.start <= time && seg.end - ONE_DAY_MS > time) { + DetailedSegments partial = new DetailedSegments(seg.segment, false); + partial.addSegmentLessOrEqualThan(time); + realQuerySegments.add(partial); + } + }); + return realQuerySegments; + } + default: + throw new UnsupportedOperationException("unsupported"); + } + } + + /** + * + * @param filter + * @param parIdx + * @return + * If segment column expression can be extract in one group return true,else return false + * e.g. "where partition >= xxx and (a = xxx or partition <= xxx) " will return false + * "where partition >= xxx and (partition = xxx or partition <= xxx) and a=xxx " will return false + * because or is not support " + */ + private SegmentFilter extractSegmentFilter(RexCall filter, int parIdx) { + List children = new LinkedList<>(); + switch (filter.getKind()) { + case AND: { + RexInputRef parRef = null; + for (RexNode child : filter.getOperands()) { + SegmentFilter childFilter = extractSegmentFilter((RexCall) child, parIdx); + if (!childFilter.canOpt) + return SegmentFilter.createNoOptSegment(); + if (childFilter.segmentFilter != null) { + parRef = childFilter.parInputRef; + } + children.add(childFilter); + } + List segmentFilterNodes = children.stream().map(e -> e.segmentFilter).filter(e -> e != null) + .collect(Collectors.toList()); + if (segmentFilterNodes.size() > 1) { + return SegmentFilter.createOptSegment(filter, + (RexCall) builder.makeCall(SqlStdOperatorTable.AND, segmentFilterNodes), parRef, children); + } else if (segmentFilterNodes.size() == 1) { + return SegmentFilter.createOptSegment(filter, segmentFilterNodes.get(0), parRef, children); + } else { + return SegmentFilter.createOptSegment(filter, null, null, children); + } + } + case OR: { + for (RexNode child : filter.getOperands()) { + SegmentFilter childFilter = extractSegmentFilter((RexCall) child, parIdx); + if (!childFilter.canOpt) + return SegmentFilter.createNoOptSegment(); + if (childFilter.segmentFilter != null) { + return SegmentFilter.createNoOptSegment(); + } + children.add(childFilter); + } + return SegmentFilter.createOptSegment(filter, null, null, children); + } + default: + List inputs = filter.accept(inputRefVisitor); + if (inputs == null) { + return SegmentFilter.createNoOptSegment(); + } else { + Set inputIdxs = inputs.stream().map(e -> e.getIndex()).collect(Collectors.toSet()); + if (inputIdxs.contains(parIdx)) { + if (inputIdxs.size() == 1) { + return SegmentFilter.createOptSegment(filter, parseCastFilter(filter), inputs.get(0), + new LinkedList<>()); + } else { + return SegmentFilter.createNoOptSegment(); + } + } else + return SegmentFilter.createOptSegment(filter, null, null, new LinkedList<>()); + } + } + } + + private RexCall parseCastFilter(RexCall filter) { + RexLiteral literal = null; + RexNode castValue = null; + if (filter.getOperands().size() == 2) { + final RexNode first = filter.getOperands().get(0); + final RexNode second = filter.getOperands().get(1); + if (first.getKind() == SqlKind.LITERAL && second.getKind() == SqlKind.CAST + && ((RexCall) second).getOperands().get(0).getKind() == SqlKind.INPUT_REF) { + literal = (RexLiteral) first; + castValue = ((RexCall) second).getOperands().get(0); + } else if (second.getKind() == SqlKind.LITERAL && first.getKind() == SqlKind.CAST + && ((RexCall) first).getOperands().get(0).getKind() == SqlKind.INPUT_REF) { + castValue = ((RexCall) first).getOperands().get(0); + literal = (RexLiteral) second; + } + + } + if (literal != null && castValue != null) + return (RexCall) builder.makeCall(filter.getOperator(), Lists.newArrayList(castValue, literal)); + else + return filter; + } + + public RexNode createPartitionIrrelevantFilter(SegmentFilter equivalentFilter) { + if (equivalentFilter.children.isEmpty()) { + if (equivalentFilter.parInputRef == null) { + return equivalentFilter.origin; + } else { + return null; + } + } else { + List filter = new ArrayList<>(); + for (SegmentFilter child : equivalentFilter.children) { + RexNode childRex = createPartitionIrrelevantFilter(child); + if (null != childRex) { + filter.add(childRex); + } + } + if (filter.size() == 0) { + return null; + } else if (filter.size() == 1) { + return filter.get(0); + } else { + return builder.makeCall(equivalentFilter.op, filter); + } + } + } + + private RexNode createTimeRangeRexNode(RexInputRef parInputRef, long rangeStart, long rangeEnd) { + if (rangeEnd == rangeStart) { + return builder.makeCall(SqlStdOperatorTable.EQUALS, + Lists.newArrayList(parInputRef, builder.makeLiteral(format.format(rangeStart)))); + } else { + RexNode gte = builder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, + Lists.newArrayList(parInputRef, builder.makeLiteral(format.format(rangeStart)))); + RexNode lte = builder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, + Lists.newArrayList(parInputRef, builder.makeLiteral(format.format(rangeEnd)))); + return builder.makeCall(SqlStdOperatorTable.AND, Lists.newArrayList(gte, lte)); + } + } + + private long getTimeMillisFromLiteral(RexNode rexNode) throws ParseException { + Object value = getValue(rexNode); + if (value instanceof Date) { + return DateFormat.stringToMillis(value.toString()); + } + if (value instanceof String || value instanceof Integer || value instanceof Long) { + return format.parse(value.toString()).getTime(); + } + if (value instanceof Timestamp) { + return ((Timestamp) value).getTime(); + } + throw new UnsupportedOperationException("unsupport time"); + } + + public Object getValue(RexNode rexNode) { + if (rexNode instanceof RexLiteral) { + return getRexLiteral((RexLiteral) rexNode); + } + if (rexNode instanceof RexCall) { + switch (rexNode.getKind()) { + case CAST: { + RexLiteral literal = (RexLiteral) ((RexCall) rexNode).getOperands().get(0); + return getRexLiteral(literal); + } + default: + throw new UnsupportedOperationException("unsupported"); + } + } else { + throw new UnsupportedOperationException("unsupported"); + } + } + + public Object getRexLiteral(RexLiteral rexLiteral) { + Comparable literalValue = rexLiteral.getValue(); + if (literalValue instanceof NlsString) { + return ((NlsString) literalValue).getValue(); + } else if (literalValue instanceof GregorianCalendar) { + GregorianCalendar g = (GregorianCalendar) literalValue; + return g.getTimeInMillis(); + } else if (literalValue instanceof TimeUnitRange) { + // Extract(x from y) in where clause + throw new UnsupportedOperationException(); + } else if (literalValue == null) { + return null; + } else { + return literalValue.toString(); + } + } + + static class SegmentFilter { + //original RexNode information + RexNode origin; + RexCall segmentFilter; + SqlOperator op; + RexInputRef parInputRef; + boolean canOpt; + List children; + + private SegmentFilter(RexCall origin, RexCall segmentFilter, SqlOperator op, RexInputRef inputRef, + Boolean canOpt, List children) { + this.origin = origin; + this.op = op; + this.segmentFilter = segmentFilter; + this.canOpt = canOpt; + this.parInputRef = inputRef; + this.children = children; + } + + public static SegmentFilter createNoOptSegment() { + return new SegmentFilter(null, null, null, null, false, new LinkedList<>()); + } + + public static SegmentFilter createOptSegment(RexCall origin, RexCall segmentFilter, RexInputRef inputRef, + List children) { + return new SegmentFilter(origin, segmentFilter, origin.getOperator(), inputRef, true, children); + } + + } + + static class DetailedSegments { + CubeSegment segment; + BitSet querySegments; + //inclusive + long start; + //exclusive + long end; + int segmentSize; + boolean convertToPartlyUsed = false; + private long segmentUnit = ONE_DAY_MS; + + public DetailedSegments(CubeSegment segment, boolean fullyUsed) { + this.segment = segment; + this.end = (long) segment.getSegRange().end.v; + this.start = (long) segment.getSegRange().start.v; + this.segmentSize = (int) ((this.end - this.start) / segmentUnit); + this.querySegments = new BitSet(segmentSize); + if (fullyUsed) { + querySegments.set(0, segmentSize, true); + } + } + + public DetailedSegments(CubeSegment segment, BitSet bitSet) { + this.segment = segment; + this.end = (long) segment.getSegRange().end.v; + this.start = (long) segment.getSegRange().start.v; + this.segmentSize = (int) ((this.end - this.start) / segmentUnit); + this.querySegments = bitSet; + } + + public DetailedSegments intersect(DetailedSegments seg) { + if (!this.segment.equals(seg.segment)) { + throw new RuntimeException("can not intersect different segments"); + } + BitSet bs = new BitSet(segmentSize); + for (int i = 0; i < segmentSize; i++) { + bs.set(i, this.querySegments.get(i) & seg.querySegments.get(i)); + } + return new DetailedSegments(this.segment, bs); + } + + public DetailedSegments union(DetailedSegments target) { + if (!this.segment.equals(target.segment)) { + throw new RuntimeException("can not union different segments"); + } + BitSet bs = new BitSet(segmentSize); + for (int i = 0; i < segmentSize; i++) { + bs.set(i, this.querySegments.get(i) | target.querySegments.get(i)); + } + DetailedSegments result = new DetailedSegments(this.segment, bs); + if (result.fullyUsed() && (!this.fullyUsed() || !target.fullyUsed())) { + result.convertToPartlyUsed = true; + } + return result; + } + + public boolean fullyUsed() { + return !convertToPartlyUsed && querySegments.cardinality() == segmentSize; + } + + public DetailedSegments clone() { + return new DetailedSegments(this.segment, (BitSet) this.querySegments.clone()); + } + + public void addSegmentGreaterThan(long timeStamp) { + if (timeStamp >= end - segmentUnit) { + return; + } + if (timeStamp < start) { + querySegments.set(0, segmentSize, true); + return; + } + // (start + index * segmentUnit) <= timeStamp + int index = (int) ((timeStamp - start) / segmentUnit); + index = index + 1; + querySegments.set(index, segmentSize, true); + } + + public void addSegmentGreaterOrEqualThan(long timeStamp) { + if (timeStamp > end - segmentUnit) { + return; + } + if (timeStamp < start) { + querySegments.set(0, segmentSize, true); + return; + } + int index = (int) ((timeStamp - start) / segmentUnit); + // (start + index * segmentUnit) <= timeStamp + if ((start + index * segmentUnit) < timeStamp) { + index = index + 1; + } + querySegments.set(index, segmentSize, true); + } + + public void addSegmentLessThan(long timeStamp) { + if (timeStamp < start) { + return; + } + if (timeStamp > end - segmentUnit) { + querySegments.set(0, segmentSize - 1, true); + return; + } + int index = (int) ((timeStamp - start) / segmentUnit); + // (start + index * segmentUnit) <= timeStamp + if ((start + index * segmentUnit) == timeStamp) { + index = index - 1; + } + querySegments.set(0, index + 1, true); + } + + public void addSegmentLessOrEqualThan(long timeStamp) { + if (timeStamp < start) { + return; + } + if (timeStamp > end - segmentUnit) { + querySegments.set(0, segmentSize - 1, true); + return; + } + int index = (int) ((timeStamp - start) / segmentUnit); + querySegments.set(0, index + 1, true); + } + + public void addSegmentEquals(long timeStamp) { + int index = (int) ((timeStamp - start) / segmentUnit); + if ((start + index * segmentUnit) == timeStamp) { + querySegments.set(0, index, true); + } + } + + public boolean merged() { + return end - start != ONE_DAY_MS; + } + } + + public List intersectSegments(List seg1, List seg2) { + LinkedList result = new LinkedList<>(); + if (seg1.isEmpty() || seg2.isEmpty()) { + return result; + } + Collections.sort(seg1, Comparator.comparingLong(a -> a.start)); + Collections.sort(seg2, Comparator.comparingLong(a -> a.start)); + for (int i = 0, j = 0; i < seg1.size() && j < seg2.size();) { + if (seg1.get(i).start == seg2.get(j).start) { + DetailedSegments detailedSegments = seg1.get(i).intersect(seg2.get(j)); + result.add(detailedSegments); + i++; + j++; + } else if (seg1.get(i).start < seg2.get(j).start) { + i++; + } else { + j++; + } + } + return result; + } + + private List unionSegments(List segList1, List segList2) { + LinkedList result = new LinkedList<>(); + int i = 0; + int j = 0; + DetailedSegments seg1 = null; + DetailedSegments seg2 = null; + for (; i < segList1.size() && j < segList2.size();) { + if (null == seg1 || null == seg2) { + seg1 = segList1.get(i); + seg2 = segList2.get(j); + } + if (seg1.start == seg2.start) { + DetailedSegments detailedSegments = seg1.union(seg2); + result.add(detailedSegments); + if (++i < segList1.size()) { + seg1 = segList1.get(i); + } + if (++j < segList2.size()) { + seg2 = segList2.get(j); + } + } else if (seg1.start < seg2.start) { + if (seg1.fullyUsed()) { + seg1.convertToPartlyUsed = true; + } + result.add(seg1); + if (++i < segList1.size()) { + seg1 = segList1.get(i); + } + } else { + if (seg2.fullyUsed()) { + seg1.convertToPartlyUsed = true; + } + result.add(seg2); + if (++j < segList2.size()) { + seg2 = segList2.get(j); + } + } + } + if (i < segList1.size()) { + for (; i < segList1.size(); i++) { + seg1 = segList1.get(i); + if (seg1.fullyUsed()) { + seg1.convertToPartlyUsed = true; + } + result.add(seg1); + } + } else if (j < segList2.size()) { + for (; j < segList2.size(); j++) { + seg2 = segList2.get(j); + if (seg2.fullyUsed()) { + seg2.convertToPartlyUsed = true; + } + result.add(seg2); + } + } + return result; + } + + //return full matched filter and partial matched filter + private Pair generateSplitFilters(List querySegments, + SegmentFilter segmentFilter) { + RexNode partitionIrrelevantFilter = createPartitionIrrelevantFilter(segmentFilter); + RexNode filterWithPartialPar = null; + if (preCheckRewrite(querySegments)) { + RexNode segmentFilterNode = createSegmentFilter(querySegments, segmentFilter.parInputRef); + filterWithPartialPar = builder.makeCall(SqlStdOperatorTable.AND, + Lists.newArrayList(segmentFilterNode, partitionIrrelevantFilter)); + } + return new Pair<>(partitionIrrelevantFilter, filterWithPartialPar); + } + + private RexNode createSegmentFilter(List segments, RexInputRef parInputRef) { + long rangeStart = 0; + long rangeEnd = 0; + if (segments.isEmpty()) { + return null; + } + long segmentUnit = segments.get(0).segmentUnit; + List operands = new ArrayList<>(); + for (DetailedSegments segment : segments) { + if (segment.fullyUsed()) {//skip fully used segment + continue; + } + for (int i = 0; i < segment.querySegments.length(); i++) { + if (rangeStart == 0) { + if (segment.querySegments.get(i)) { + rangeStart = segment.start + i * segmentUnit; + rangeEnd = rangeStart; + } + continue; + } + if (!segment.querySegments.get(i)) { //query time range is discontinuous + operands.add(createTimeRangeRexNode(parInputRef, rangeStart, rangeEnd)); + rangeStart = segment.start + i * segmentUnit; + rangeEnd = segment.start + i * segmentUnit; + } else { + if ((rangeEnd + segmentUnit) == (segment.start + i * segmentUnit)) { //query time range is continuous + rangeEnd = segment.start + i * segmentUnit; + } else { + operands.add(createTimeRangeRexNode(parInputRef, rangeStart, rangeEnd)); + rangeStart = segment.start + i * segmentUnit; + rangeEnd = segment.start + i * segmentUnit; + } + } + } + } + if (rangeStart != 0) { + operands.add(createTimeRangeRexNode(parInputRef, rangeStart, rangeEnd)); + } + if (operands.size() == 0) { + return null; + } else if (operands.size() == 1) { + return operands.get(0); + } else { + return builder.makeCall(SqlStdOperatorTable.OR, operands); + } + } + + private boolean preCheckRewrite(List segments) { + //全是Segment全匹配,部分partiton匹配将不存在,不改写 + boolean containsPartlyUsedSegments = false; + for (DetailedSegments querySegment : segments) { + if (!querySegment.fullyUsed()) { + containsPartlyUsedSegments = true; + break; + } + } + + //全是Segment部分匹配,partiton不需要改写 + boolean queryPartialSegments = false; + for (DetailedSegments sg : segments) { + if (sg.fullyUsed()) { + queryPartialSegments = true; + break; + } + } + return containsPartlyUsedSegments && queryPartialSegments; + } + + private class InputRefVisitor implements RexVisitor> { + + @Override + public List visitInputRef(RexInputRef inputRef) { + List rexIds = new LinkedList<>(); + rexIds.add(inputRef); + return rexIds; + } + + @Override + public List visitLocalRef(RexLocalRef localRef) { + return null; + } + + @Override + public List visitLiteral(RexLiteral literal) { + return new LinkedList<>(); + } + + @Override + public List visitCall(RexCall call) { + List rexIds = new LinkedList<>(); + for (RexNode operand : call.getOperands()) { + List child = operand.accept(this); + if (child == null) + return null; + rexIds.addAll(child); + } + return rexIds; + } + + @Override + public List visitOver(RexOver over) { + return null; + } + + @Override + public List visitCorrelVariable(RexCorrelVariable correlVariable) { + return null; + } + + @Override + public List visitDynamicParam(RexDynamicParam dynamicParam) { + return null; + } + + @Override + public List visitRangeRef(RexRangeRef rangeRef) { + return null; + } + + @Override + public List visitFieldAccess(RexFieldAccess fieldAccess) { + return null; + } + + @Override + public List visitSubQuery(RexSubQuery subQuery) { + return null; + } + + @Override + public List visitTableInputRef(RexTableInputRef fieldRef) { + return null; + } + + @Override + public List visitPatternFieldRef(RexPatternFieldRef fieldRef) { + return null; + } + + } } diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java index 2baf2f1515..fa94e6ee0e 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java @@ -330,7 +330,7 @@ public class OLAPJoinRel extends EnumerableJoin implements OLAPRel { } @Override - public void implementRewrite(RewriteImplementor implementor) { + public void implementRewrite(RelNode parent, RewriteImplementor implementor) { implementor.visitChild(this, this.left); implementor.visitChild(this, this.right); diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java index 8e04859f48..26c25458e8 100755 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java @@ -108,7 +108,7 @@ public class OLAPLimitRel extends SingleRel implements OLAPRel { } @Override - public void implementRewrite(RewriteImplementor implementor) { + public void implementRewrite(RelNode parent, RewriteImplementor implementor) { implementor.visitChild(this, getInput()); this.rowType = this.deriveRowType(); diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPNonEquiJoinRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPNonEquiJoinRel.java index cb25f12328..8f752e3244 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPNonEquiJoinRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPNonEquiJoinRel.java @@ -132,7 +132,7 @@ public class OLAPNonEquiJoinRel extends EnumerableThetaJoin implements OLAPRel { } @Override - public void implementRewrite(RewriteImplementor implementor) { + public void implementRewrite(RelNode parent, RewriteImplementor implementor) { implementor.visitChild(this, this.left); implementor.visitChild(this, this.right); diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java index 155a586d76..290b74df41 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java @@ -266,7 +266,7 @@ public class OLAPProjectRel extends Project implements OLAPRel { } @Override - public void implementRewrite(RewriteImplementor implementor) { + public void implementRewrite(RelNode parent, RewriteImplementor implementor) { implementor.visitChild(this, getInput()); this.rewriting = true; diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPRel.java index a10ba253c3..8e0349a936 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPRel.java @@ -175,7 +175,7 @@ public interface OLAPRel extends RelNode { this.parentContext = olapRel.getContext(); } OLAPRel olapChild = (OLAPRel) child; - olapChild.implementRewrite(this); + olapChild.implementRewrite(parent, this); } public OLAPContext getParentContext() { @@ -198,7 +198,7 @@ public interface OLAPRel extends RelNode { } } - public void implementRewrite(RewriteImplementor rewriter); + public void implementRewrite(RelNode parent, RewriteImplementor rewriter); /** * implementor for java generation diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java index de05005abc..38b5db7fc4 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java @@ -92,7 +92,7 @@ public class OLAPSortRel extends Sort implements OLAPRel { } @Override - public void implementRewrite(RewriteImplementor implementor) { + public void implementRewrite(RelNode parent, RewriteImplementor implementor) { implementor.visitChild(this, getInput()); // No need to rewrite "order by" applied on non-olap context. diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java index bbb4042c22..cd7c419ccf 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPTableScan.java @@ -458,7 +458,7 @@ public class OLAPTableScan extends TableScan implements OLAPRel, EnumerableRel { } @Override - public void implementRewrite(RewriteImplementor implementor) { + public void implementRewrite(RelNode parent, RewriteImplementor implementor) { Map rewriteFields = this.context.rewriteFields; for (Map.Entry rewriteField : rewriteFields.entrySet()) { String fieldName = rewriteField.getKey(); diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPUnionRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPUnionRel.java index 7366eeb8d2..7911bcdd61 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPUnionRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPUnionRel.java @@ -117,7 +117,7 @@ public class OLAPUnionRel extends Union implements OLAPRel { } @Override - public void implementRewrite(RewriteImplementor implementor) { + public void implementRewrite(RelNode parent, RewriteImplementor implementor) { for (RelNode child : getInputs()) { implementor.visitChild(this, child); } diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPValuesRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPValuesRel.java index 9bb05188de..c14183b55d 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPValuesRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPValuesRel.java @@ -120,7 +120,7 @@ public class OLAPValuesRel extends Values implements OLAPRel { } @Override - public void implementRewrite(RewriteImplementor rewriter) { + public void implementRewrite(RelNode parent, RewriteImplementor rewriter) { } diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPWindowRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPWindowRel.java index 7c9721af05..56ae8b7872 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPWindowRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPWindowRel.java @@ -114,7 +114,7 @@ public class OLAPWindowRel extends Window implements OLAPRel { } @Override - public void implementRewrite(RewriteImplementor implementor) { + public void implementRewrite(RelNode parent, RewriteImplementor implementor) { for (RelNode child : getInputs()) { implementor.visitChild(this, child); } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 108a9f941c..d89c5d8397 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -322,12 +322,14 @@ public class QueryService extends BasicService { if (!response.isHitExceptionCache() && null != OLAPContext.getThreadLocalContexts()) { for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) { - Cuboid cuboid = ctx.storageContext.getCuboid(); - if (cuboid != null) { + List cuboids = ctx.storageContext.getCuboidList(); + if (!cuboids.isEmpty()) { //Some queries do not involve cuboid, e.g. lookup table query - cuboidIds.add(cuboid.getId()); + cuboids.forEach(cuboid -> { + cuboidIds.add(cuboid.getId()); + isExactlyMatchSet.add(ctx.isExactlyAggregate); + }); } - isExactlyMatchSet.add(ctx.isExactlyAggregate); if (ctx.realization != null) { realizationNames.add(ctx.realization.getCanonicalName()); -- 2.32.0 (Apple Git-132)