Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-5977

Fix Date to String casts when non-vectorized readers are used

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • None

    Description

      When a Date -> String type conversion is performed and when the non-vectorized reader is used, the table becomes unreadable.

       

      Test casae to replicate this issue

       

      test("Test DATE to STRING conversions when vectorized reading is not enabled") {
        val tableName = generateTableName
        spark.sql(
          s"""
             | create table $tableName (
             |  id int,
             |  name string,
             |  price double,
             |  ts long
             |) using hudi
             | partitioned by (ts)
             |tblproperties (
             |  primaryKey = 'id'
             )
           """.stripMargin)
        spark.sql(
          s"""
             | insert into $tableName
             | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
            """.stripMargin)
        spark.sql("set hoodie.schema.on.read.enable = true"// adding a struct column to force reads to use non-vectorized readers
        spark.sql(s"alter table $tableName add column (`new_struct_col` STRUCT<f0: INTEGER, f1: STRING>)")
        spark.sql(
          s"""
             | insert into $tableName
             | values (2, 'a2', 20, struct(2, 'f_2'), 1001)
            """.stripMargin)  spark.sql(s"alter table $tableName add column (`date_to_string_col` date)")
        spark.sql(
          s"""
             | insert into $tableName
             | values (3, 'a3', 30, struct(3, 'f_3'), date '2023-03-22', 1002)
            """.stripMargin)
        spark.sql(s"alter table $tableName alter column `date_to_string_col` type string")
        // struct and string (converted from date) column must be read to ensure that non-vectorized reader is used
        checkAnswer(s"select * from $tableName")(
          Seq("year=2021/month=02/day=%s".format(DEFAULT_PARTITION_PATH)),
          Seq("year=2021/month=02/day=01")
        )
      }

       

      Stacktrace

      Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 32) (10.2.174.68 executor driver): java.util.NoSuchElementException: None.get
          at scala.None$.get(Option.scala:529)
          at scala.None$.get(Option.scala:527)
          at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId(datetimeExpressions.scala:53)
          at org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression.zoneId$(datetimeExpressions.scala:53)
          at org.apache.spark.sql.catalyst.expressions.CastBase.zoneId$lzycompute(Cast.scala:254)
          at org.apache.spark.sql.catalyst.expressions.CastBase.zoneId(Cast.scala:254)
          at org.apache.spark.sql.catalyst.expressions.CastBase.dateFormatter$lzycompute(Cast.scala:297)
          at org.apache.spark.sql.catalyst.expressions.CastBase.dateFormatter(Cast.scala:297)
          at org.apache.spark.sql.catalyst.expressions.CastBase.castToStringCode(Cast.scala:1059)
          at org.apache.spark.sql.catalyst.expressions.CastBase.nullSafeCastFunction(Cast.scala:871)
          at org.apache.spark.sql.catalyst.expressions.CastBase.doGenCode(Cast.scala:854)
          at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:146)
          at scala.Option.getOrElse(Option.scala:189)
          at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:141)
          at org.apache.spark.sql.catalyst.expressions.CastBase.genCode(Cast.scala:848)
          at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$1(CodeGenerator.scala:1187)
          at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
          at scala.collection.immutable.List.foreach(List.scala:392)
          at scala.collection.TraversableLike.map(TraversableLike.scala:238)
          at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
          at scala.collection.immutable.List.map(List.scala:298)
          at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1187)
          at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:290)
          at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:338)
          at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:331)
          at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:34)
          at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1278)
          at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1275)
          at org.apache.spark.sql.execution.datasources.parquet.Spark31HoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark31HoodieParquetFileFormat.scala:345)
          at org.apache.hudi.HoodieDataSourceHelper$.$anonfun$buildHoodieParquetReader$1(HoodieDataSourceHelper.scala:67)
          at org.apache.hudi.HoodieBaseRelation.$anonfun$createBaseFileReader$1(HoodieBaseRelation.scala:603)
          at org.apache.hudi.HoodieBaseRelation$BaseFileReader.apply(HoodieBaseRelation.scala:665)
          at org.apache.hudi.BaseFileOnlyRelation.$anonfun$composeRDD$1(BaseFileOnlyRelation.scala:103)
          at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
          at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
          at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
          at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
          at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
          at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
          at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
          at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
          at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
          at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
          at org.apache.spark.scheduler.Task.run(Task.scala:131)
          at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
          at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1465)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:750) 

      Attachments

        Issue Links

          Activity

            People

              voonhous voon
              voonhous voon
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: