Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-7388

[Regression] Records with a field of logical type decimal can no longer be ingested via HoodieStreamer

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.0.0-beta1, 0.14.1, 0.15.0, 1.0.0-beta2
    • 0.16.0
    • deltastreamer
    • None

    Description

      Problem

      When attempting to ingest record with an Avro target schema which includes a field that uses the decimal logical type in Hudi 0.14.1, an exception is thrown:

      24/02/06 21:30:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
      java.lang.RuntimeException: org.apache.avro.AvroRuntimeException: cannot support rewrite value for schema type: {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2} since the old schema type is: {"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
              at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
              at org.apache.hudi.utilities.streamer.HoodieStreamerUtils.lambda$null$a903797$1(HoodieStreamerUtils.java:92)
              at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
              at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
              at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
              at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
              at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
              at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
              at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
              at org.apache.spark.scheduler.Task.run(Task.scala:139)
              at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
              at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
              at java.base/java.lang.Thread.run(Thread.java:840)
      Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite value for schema type: {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2} since the old schema type is: {"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
              at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1088)
              at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006)
              at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951)
              at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
              at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
              at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
              at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847)
              at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1259)
              at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
              at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
              at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
              ... 21 more 

      Reproduction Steps

      1. Setup clean spark install

      mkdir /tmp/hudi-decimal-repro
      cd /tmp/hudi-decimal-repro
      tar -xvzf ~/spark-3.4.2-bin-hadoop3.tgz

      2. Create a minimal schema file based on the demo schema.  The only change is the type of the field named low.

      echo '{
        "type":"record",
        "name":"stock_ticks",
        "fields":[{
           "name": "volume",
           "type": "long"
        }, {
           "name": "ts",
           "type": "string"
        }, {
           "name": "symbol",
           "type": "string"
        },{
           "name": "year",
           "type": "int"
        },{
           "name": "month",
           "type": "string"
        },{
           "name": "high",
           "type": "double"
        },{
           "name": "low",
           "type": {
             "type": "bytes",
             "logicalType": "decimal",
             "precision": 4,
             "scale": 2
           }
        },{
           "name": "key",
           "type": "string"
        },{
           "name": "date",
           "type":"string"
        }, {
           "name": "close",
           "type": "double"
        }, {
           "name": "open",
           "type": "double"
        }, {
           "name": "day",
           "type":"string"
        }
      ]}' > schema.avsc

      3. Create a minimal properties file

      echo "hoodie.datasource.write.recordkey.field=key
      hoodie.datasource.write.partitionpath.field=date
      hoodie.table.recordkey.fields=key
      hoodie.table.partition.fields=date
      hoodie.streamer.schemaprovider.source.schema.file=/tmp/hudi-decimal-repro/schema.avsc
      hoodie.streamer.schemaprovider.target.schema.file=/tmp/hudi-decimal-repro/schema.avsc
      hoodie.streamer.source.dfs.root=/tmp/hudi-decimal-repro/data" > decimal-repro.properties

      4. Copy data file from the docker demo

      mkdir data
      cd data
      wget https://raw.githubusercontent.com/apache/hudi/release-0.14.1/docker/demo/data/batch_1.json 
      cd ..

      5. Run HoodieStreamer

      spark-3.4.2-bin-hadoop3/bin/spark-submit \
         --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.15.0,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.15.0 \
         --conf spark.kryoserializer.buffer.max=200m \
         --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
         --class org.apache.hudi.utilities.streamer.HoodieStreamer \
         spark-3.4.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.4.2.jar \
         --table-type COPY_ON_WRITE \
         --source-class org.apache.hudi.utilities.sources.JsonDFSSource \
         --target-base-path /tmp/hudi-decimal-repro/table \
         --target-table table \
         --props /tmp/hudi-decimal-repro/decimal-repro.properties \
         --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider 

      Expected Results

      Command runs successfully, data is ingested successfully into /tmp/hudi-decimal-repro/table, some files exist under /tmp/hudi-decimal-repro/table/2018/08/31/.

      Actual Results

      Command fails with exception, no data is ingsted into the table.  Table left with a hanging commit at the requested state.

      Logs of the attempted run are attached as spark.log

      Additional Information

      This issue does not appear to exist in versions 0.12.2 through 0.14.0 based on my own testing.  It does affect all of 0.14.1, 0.15.0, 1.0.0-beta1, and 1.0.0-beta2 releases.

      Attachments

        1. decimal-repro.properties
          0.4 kB
          Brandon Dahler
        2. schema.avsc
          0.7 kB
          Brandon Dahler
        3. spark.log
          238 kB
          Brandon Dahler

        Activity

          People

            Unassigned Unassigned
            brandon.dahler.amazon Brandon Dahler
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: