Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-1906

Deltastreamer/SparkDatasource ingestion breaks when changing target schema provider options

    XMLWordPrintableJSON

Details

    Description

      Currently, there are few different options to the user to provide target schemas such as file based, schema registry. At a high level, there are 2 main flows 

      1. Target Schema is provided by the user
      2. Target schema is not provided by the user (which is then inferred from the incoming data)

       

      Schema post processor enabled Transformers User provided target schema Cur behavior
      yes No Yes table schema's has no namespace. matches user provided schema
      yes yes No had to make minor fix in post processor for NPE. with the fix, table schema has namespace in it.
      yes yes yes table schema has namespace
      no no yes table schema's has no namespace. matches user provided schema
      no yes yes table schema's has no namespace. matches user provided schema
      no yes no table's schema has namespace.

       

      Source -> https://github.com/apache/hudi/pull/2937

      As you can see above, if one switches from a non-user-provided schema flow to a user-provided-schema flow, we switch from namespace in schema to no namespace in schema. 

      Parquet does not store the namespace, so when moving across avro schemas with and without namespace, the parquet-avro writer or reader does not complain since parquet itself does not store namespace. 

      However, for MergeOnRead tables, we serialize data and schema in the log blocks. The GenericDatumReader that takes a reader & writer schema to translate breaks when one schema has namespace while the other doesn't. 

       

      The following exception is thrown 

      51511 [Executor task launch worker for task 502] ERROR org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner  - Got exception when reading log file
      org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
      	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
      	at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
      	at org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
      	at org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
      	at org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
      	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
      	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
      	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
      	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
      	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
      	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
      	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
      	at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157)
      	at org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
      	at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
      	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275)
      	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308)
      	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241)
      	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
      	at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259)
      	at org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.<init>(HoodieMergeOnReadRDD.scala:164)
      	at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154)
      	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
      	at org.apache.spark.scheduler.Task.run(Task.scala:123)
      	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
      	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748

       This seems like an AVRO shortcoming. We need a way to avoid breaking the decoding of avro data in log files if the user moved around provider options. One way is to implement a custom GenericDatumReader. 

      Attachments

        Activity

          People

            shivnarayan sivabalan narayanan
            nishith29 Nishith Agarwal
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: