Uploaded image for project: 'Kylin'
  1. Kylin
  2. KYLIN-5200

Kylin4 RAW Schema written to Parquet and read from Parquet are inconsistent

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • v4.0.1
    • None
    • Metadata
    • None

    Description

      I created a cube on kylin version 4.0.1. One of the measures is defined as raw. When I query after building, I find that there are inconsistencies between parquet schema and spark schema. When building cube, the raw measure written to parquet is processed with spark max, and the datatype of Max is child Datatype, in my cube, child Datatype is decimal (19,4). However, when I query through SQL, raw is uniformly specified as binarytype in tablescanpaln. Therefore, I wonder if the structtype of raw in tablescanpaln also uses child dataType ?

      when build ,Raw type is child.dataType
      @see org.apache.kylin.engine.spark.job.CuboidAggregator

      measure.expression.toUpperCase(Locale.ROOT) match {
              case "MAX" =>
                max(columns.head).as(id.toString)
              case "MIN" =>
                min(columns.head).as(id.toString)
              case "SUM" =>
                sum(columns.head).as(id.toString)
              case "COUNT" =>
                if (reuseLayout) {
                  sum(columns.head).as(id.toString)
                } else {
                  count(columns.head).as(id.toString)
                }
              case "COUNT_DISTINCT" =>
                // for test
                if (isSparkSql) {
                  countDistinct(columns.head).as(id.toString)
                } else {
                  val cdAggregate = getCountDistinctAggregate(columns, measure.returnType, reuseLayout)
                  new Column(cdAggregate.toAggregateExpression()).as(id.toString)
                }
              case "TOP_N" =>
                // Uses new TopN aggregate function
                // located in kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
                val schema = StructType(measure.pra.map { col =>
                  val dateType = col.dataType
                  if (col == measure) {
                    StructField(s"MEASURE_${col.columnName}", dateType)
                  } else {
                    StructField(s"DIMENSION_${col.columnName}", dateType)
                  }
                })
      
                if (reuseLayout) {
                  new Column(ReuseTopN(measure.returnType.precision, schema, columns.head.expr)
                    .toAggregateExpression()).as(id.toString)
                } else {
                  new Column(EncodeTopN(measure.returnType.precision, schema, columns.head.expr, columns.drop(1).map(_.expr))
                    .toAggregateExpression()).as(id.toString)
                }
              case "PERCENTILE_APPROX" =>
                val udfName = UdfManager.register(measure.returnType.toKylinDataType, measure.expression, null, !reuseLayout)
                if (!reuseLayout) {
                  callUDF(udfName, columns.head.cast(StringType)).as(id.toString)
                } else {
                  callUDF(udfName, columns.head).as(id.toString)
                }
              case _ =>
                max(columns.head).as(id.toString) // Raw matcher here,but max dataType is child.dataType
            }
          }.toSeq
      

      But when query,Raw StructType is BinaryType.
      @see org.apache.kylin.query.runtime.plans.TableScanPlan ,org.apache.spark.sql.utils.SparkTypeUtil

      def toSparkType(dataTp: DataType, isSum: Boolean = false): org.apache.spark.sql.types.DataType = {
          dataTp.getName match {
            // org.apache.spark.sql.catalyst.expressions.aggregate.Sum#resultType
            case "decimal" =>
              if (isSum) {
                val i = dataTp.getPrecision + 10
                DecimalType(Math.min(DecimalType.MAX_PRECISION, i), dataTp.getScale)
              }
              else DecimalType(dataTp.getPrecision, dataTp.getScale)
            case "date" => DateType
            case "time" => DateType
            case "timestamp" => TimestampType
            case "datetime" => DateType
            case "tinyint" => if (isSum) LongType else ByteType
            case "smallint" => if (isSum) LongType else ShortType
            case "integer" => if (isSum) LongType else IntegerType
            case "int4" => if (isSum) LongType else IntegerType
            case "bigint" => LongType
            case "long8" => LongType
            case "float" => if (isSum) DoubleType else FloatType
            case "double" => DoubleType
            case tp if tp.startsWith("varchar") => StringType
            case tp if tp.startsWith("char") => StringType
            case "dim_dc" => LongType
            case "boolean" => BooleanType
            case tp if tp.startsWith("hllc") => BinaryType
            case tp if tp.startsWith("bitmap") => BinaryType
            case tp if tp.startsWith("extendedcolumn") => BinaryType
            case tp if tp.startsWith("percentile") => BinaryType
            case tp if tp.startsWith("raw") => BinaryType
            case _ => throw new IllegalArgumentException(dataTp.toString)
          }
        }
      

      Attachments

        Activity

          People

            zhaoliu4 Liu Zhao
            zhaoliu4 Liu Zhao
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: