Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40409

IncompatibleSchemaException when BYTE stored from DataFrame to Avro is read using spark-sql

    XMLWordPrintableJSON

Details

    • Bug
    • Status: In Progress
    • Major
    • Resolution: Unresolved
    • 3.2.1
    • None
    • Input/Output
    • None

    Description

      Describe the bug

      We are trying to store a BYTE "-128" to a table created via Spark DataFrame. The table is created with the Avro file format. We encounter no errors while creating the table and inserting the aforementioned BYTE value. However, performing a SELECT query on the table through spark-sql results in an IncompatibleSchemaException as shown below:

      2022-09-09 21:15:03,248 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
      org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro type {"type":"record","name":"topLevelRecord","fields"$
      [{"name":"c1","type":["int","null"]}]} to SQL type STRUCT<`c1`: TINYINT>

      Step to reproduce

      On Spark 3.2.1 (commit 4f25b3f712), using spark-shell with the Avro package:

      ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1

      Execute the following:

      import org.apache.spark.sql.{Row, SparkSession}
      import org.apache.spark.sql.types._
      val rdd = sc.parallelize(Seq(Row(("-128").toByte)))
      val schema = new StructType().add(StructField("c1", ByteType, true))
      val df = spark.createDataFrame(rdd, schema)
      df.show(false)
      df.write.mode("overwrite").format("avro").saveAsTable("byte_avro")

      On Spark 3.2.1 (commit 4f25b3f712), using spark-sql with the Avro package:

      ./bin/spark-sql --packages org.apache.spark:spark-avro_2.12:3.2.1

      Execute the following:

      spark-sql> select * from byte_avro;

      Expected behavior

      We expect the output of the SELECT query to be -128. Additionally, we expect the data type to be preserved (it is changed from BYTE/TINYINT to INT, hence the mismatch). We tried other formats like ORC and the outcome is consistent with this expectation. Here are the logs from our attempt at doing the same with ORC:

      scala> df.write.mode("overwrite").format("orc").saveAsTable("byte_orc")
      2022-09-09 21:38:28,880 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
      2022-09-09 21:38:28,880 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
      2022-09-09 21:38:34,642 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manage
      r is set to instance of HiveAuthorizerFactory.
      2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
      2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
      2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
      scala> spark.sql("select * from byte_orc;")
      res2: org.apache.spark.sql.DataFrame = [c1: tinyint]
      scala> spark.sql("select * from byte_orc;").show(false)
      +----+
      |c1  |
      +----+
      |-128|
      +----+
      

      Root Cause

      AvroSerializer

         (catalystType, avroType.getType) match {
            case (NullType, NULL) =>
              (getter, ordinal) => null
            case (BooleanType, BOOLEAN) =>
              (getter, ordinal) => getter.getBoolean(ordinal)
            case (ByteType, INT) =>
              (getter, ordinal) => getter.getByte(ordinal).toInt
            case (ShortType, INT) =>
              (getter, ordinal) => getter.getShort(ordinal).toInt
            case (IntegerType, INT) =>
              (getter, ordinal) => getter.getInt(ordinal)

      AvroDeserializer

          (avroType.getType, catalystType) match {
            case (NULL, NullType) => (updater, ordinal, _) =>
              updater.setNullAt(ordinal)
            // TODO: we can avoid boxing if future version of avro provide primitive accessors.
            case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
              updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
            case (INT, IntegerType) => (updater, ordinal, value) =>
              updater.setInt(ordinal, value.asInstanceOf[Int])
            case (INT, DateType) => (updater, ordinal, value) =>
              updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
      

      AvroSerializer converts Spark's ByteType into Avro's INT. Further, Spark's AvroDeserializer expects Avro's INT to map to Spark's IntegerType. The mismatch between user-specified ByteType & the type AvroDeserializer expects (IntegerType) is the root cause of this issue.

      Attachments

        Activity

          People

            Unassigned Unassigned
            x/sys xsys
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: