Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-7143

schema evolution triggers a CDC query exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • None
    • None
    • spark

    Description

      sparkSession.sql("CREATE TABLE if not exists hudi_ut_schema_evolution (id INT, version INT, name STRING, birthDate TIMESTAMP, inc_day STRING) USING HUDI PARTITIONED BY (inc_day) TBLPROPERTIES (hoodie.table.cdc.enabled='true', type='cow', primaryKey='id')");
      
      20231127201042503.commit:
      sparkSession.sql("merge into hudi_ut_schema_evolution t using ( select 1 as id, 1 as version, 'str_1' as name, cast('2023-01-01 12:12:12.0' as timestamp) as birthDate, '2023-10-01' as inc_day) s  on t.id=s.id when matched THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *; ");
      
      20231127201113131.commit:
      sparkSession.sql("ALTER TABLE hudi_ut_schema_evolution ADD COLUMNS (add1 String AFTER id); ");
      
      20231127201124255.commit:
      sparkSession.sql("merge into hudi_ut_schema_evolution t using ( select '1' as add1, 2 as id, 1 as version, 'str_1' as name, cast('2023-01-01 12:12:12.0' as timestamp) as birthDate, '2023-10-01' as inc_day) s  on t.id=s.id when matched THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *; ");
      
      20231127201146659.commit:
      sparkSession.sql("ALTER TABLE hudi_ut_schema_evolution DROP COLUMN add1");
      
      20231127201157382.commit:
      sparkSession.sql("ALTER TABLE hudi_ut_schema_evolution ADD COLUMNS (add1 int)");
      
      20231127201208532.commit:
      sparkSession.sql("merge into hudi_ut_schema_evolution t using ( select 1 as add1, 3 as id, 1 as version, 'str_1' as name, cast('2023-01-01 12:12:12.0' as timestamp) as birthDate, '2023-10-01' as inc_day) s  on t.id=s.id when matched THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *; ");
      
      sparkSession.sql("select * from hudi_ut_schema_evolution").show(100, false);
      +-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------+-----+-------------------+----+----------+
      |_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                        |id |version|name |birthDate          |add1|inc_day   |
      +-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------+-----+-------------------+----+----------+
      |20231127201042503  |20231127201042503_0_0|1                 |inc_day=2023-10-01    |2fe30d70-daa3-4ebc-8dab-313116e1f8f3-0_0-103-89_20231127201208532.parquet|1  |1      |str_1|2023-01-01 12:12:12|null|2023-10-01|
      |20231127201124255  |20231127201124255_0_1|2                 |inc_day=2023-10-01    |2fe30d70-daa3-4ebc-8dab-313116e1f8f3-0_0-103-89_20231127201208532.parquet|2  |1      |str_1|2023-01-01 12:12:12|null|2023-10-01|
      |20231127201208532  |20231127201208532_0_2|3                 |inc_day=2023-10-01    |2fe30d70-daa3-4ebc-8dab-313116e1f8f3-0_0-103-89_20231127201208532.parquet|3  |1      |str_1|2023-01-01 12:12:12|1   |2023-10-01|
      +-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------+-----+-------------------+----+----------+
      
      sparkSession.sql("select * from hudi_table_changes('hudi_ut_schema_evolution','cdc','20231127201042503','20231127201208532')").show(100, false);
      exception:
      org.apache.avro.AvroTypeException: Found string, expecting union
      	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
      	at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
      	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
      	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
      	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
      	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
      	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
      	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
      	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
      	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
      	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
      	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
      	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
      	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
      	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
      	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
      	at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:196)
      	at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:146)
      	at org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
      	at org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
      	at org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator.hasNext(HoodieCDCLogRecordIterator.java:77)
      	at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNextInternal(HoodieCDCRDD.scala:266)
      	at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNext(HoodieCDCRDD.scala:275)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
      	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
      	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
      	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
      	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      	at org.apache.spark.scheduler.Task.run(Task.scala:131)
      	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
      	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:750)
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            loukey_j loukey_j
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: