Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
v4.0.1
-
None
-
None
Description
I created a cube on kylin version 4.0.1. One of the measures is defined as raw. When I query after building, I find that there are inconsistencies between parquet schema and spark schema. When building cube, the raw measure written to parquet is processed with spark max, and the datatype of Max is child Datatype, in my cube, child Datatype is decimal (19,4). However, when I query through SQL, raw is uniformly specified as binarytype in tablescanpaln. Therefore, I wonder if the structtype of raw in tablescanpaln also uses child dataType ?
when build ,Raw type is child.dataType
@see org.apache.kylin.engine.spark.job.CuboidAggregator
measure.expression.toUpperCase(Locale.ROOT) match { case "MAX" => max(columns.head).as(id.toString) case "MIN" => min(columns.head).as(id.toString) case "SUM" => sum(columns.head).as(id.toString) case "COUNT" => if (reuseLayout) { sum(columns.head).as(id.toString) } else { count(columns.head).as(id.toString) } case "COUNT_DISTINCT" => // for test if (isSparkSql) { countDistinct(columns.head).as(id.toString) } else { val cdAggregate = getCountDistinctAggregate(columns, measure.returnType, reuseLayout) new Column(cdAggregate.toAggregateExpression()).as(id.toString) } case "TOP_N" => // Uses new TopN aggregate function // located in kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala val schema = StructType(measure.pra.map { col => val dateType = col.dataType if (col == measure) { StructField(s"MEASURE_${col.columnName}", dateType) } else { StructField(s"DIMENSION_${col.columnName}", dateType) } }) if (reuseLayout) { new Column(ReuseTopN(measure.returnType.precision, schema, columns.head.expr) .toAggregateExpression()).as(id.toString) } else { new Column(EncodeTopN(measure.returnType.precision, schema, columns.head.expr, columns.drop(1).map(_.expr)) .toAggregateExpression()).as(id.toString) } case "PERCENTILE_APPROX" => val udfName = UdfManager.register(measure.returnType.toKylinDataType, measure.expression, null, !reuseLayout) if (!reuseLayout) { callUDF(udfName, columns.head.cast(StringType)).as(id.toString) } else { callUDF(udfName, columns.head).as(id.toString) } case _ => max(columns.head).as(id.toString) // Raw matcher here,but max dataType is child.dataType } }.toSeq
But when query,Raw StructType is BinaryType.
@see org.apache.kylin.query.runtime.plans.TableScanPlan ,org.apache.spark.sql.utils.SparkTypeUtil
def toSparkType(dataTp: DataType, isSum: Boolean = false): org.apache.spark.sql.types.DataType = { dataTp.getName match { // org.apache.spark.sql.catalyst.expressions.aggregate.Sum#resultType case "decimal" => if (isSum) { val i = dataTp.getPrecision + 10 DecimalType(Math.min(DecimalType.MAX_PRECISION, i), dataTp.getScale) } else DecimalType(dataTp.getPrecision, dataTp.getScale) case "date" => DateType case "time" => DateType case "timestamp" => TimestampType case "datetime" => DateType case "tinyint" => if (isSum) LongType else ByteType case "smallint" => if (isSum) LongType else ShortType case "integer" => if (isSum) LongType else IntegerType case "int4" => if (isSum) LongType else IntegerType case "bigint" => LongType case "long8" => LongType case "float" => if (isSum) DoubleType else FloatType case "double" => DoubleType case tp if tp.startsWith("varchar") => StringType case tp if tp.startsWith("char") => StringType case "dim_dc" => LongType case "boolean" => BooleanType case tp if tp.startsWith("hllc") => BinaryType case tp if tp.startsWith("bitmap") => BinaryType case tp if tp.startsWith("extendedcolumn") => BinaryType case tp if tp.startsWith("percentile") => BinaryType case tp if tp.startsWith("raw") => BinaryType case _ => throw new IllegalArgumentException(dataTp.toString) } }