Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.0.0-beta1, 0.14.1, 0.15.0, 1.0.0-beta2
-
None
Description
Problem
When attempting to ingest record with an Avro target schema which includes a field that uses the decimal logical type in Hudi 0.14.1, an exception is thrown:
24/02/06 21:30:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.RuntimeException: org.apache.avro.AvroRuntimeException: cannot support rewrite value for schema type: {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2} since the old schema type is: {"type":"bytes","logicalType":"decimal","precision":4,"scale":2} at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121) at org.apache.hudi.utilities.streamer.HoodieStreamerUtils.lambda$null$a903797$1(HoodieStreamerUtils.java:92) at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite value for schema type: {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2} since the old schema type is: {"type":"bytes","logicalType":"decimal","precision":4,"scale":2} at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1088) at org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847) at org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1259) at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40) at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30) at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119) ... 21 more
Reproduction Steps
1. Setup clean spark install
mkdir /tmp/hudi-decimal-repro cd /tmp/hudi-decimal-repro tar -xvzf ~/spark-3.4.2-bin-hadoop3.tgz
2. Create a minimal schema file based on the demo schema. The only change is the type of the field named low.
echo '{ "type":"record", "name":"stock_ticks", "fields":[{ "name": "volume", "type": "long" }, { "name": "ts", "type": "string" }, { "name": "symbol", "type": "string" },{ "name": "year", "type": "int" },{ "name": "month", "type": "string" },{ "name": "high", "type": "double" },{ "name": "low", "type": { "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2 } },{ "name": "key", "type": "string" },{ "name": "date", "type":"string" }, { "name": "close", "type": "double" }, { "name": "open", "type": "double" }, { "name": "day", "type":"string" } ]}' > schema.avsc
3. Create a minimal properties file
echo "hoodie.datasource.write.recordkey.field=key hoodie.datasource.write.partitionpath.field=date hoodie.table.recordkey.fields=key hoodie.table.partition.fields=date hoodie.streamer.schemaprovider.source.schema.file=/tmp/hudi-decimal-repro/schema.avsc hoodie.streamer.schemaprovider.target.schema.file=/tmp/hudi-decimal-repro/schema.avsc hoodie.streamer.source.dfs.root=/tmp/hudi-decimal-repro/data" > decimal-repro.properties
4. Copy data file from the docker demo
mkdir data
cd data
wget https://raw.githubusercontent.com/apache/hudi/release-0.14.1/docker/demo/data/batch_1.json
cd ..
5. Run HoodieStreamer
spark-3.4.2-bin-hadoop3/bin/spark-submit \ --packages org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.15.0,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.15.0 \ --conf spark.kryoserializer.buffer.max=200m \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --class org.apache.hudi.utilities.streamer.HoodieStreamer \ spark-3.4.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.4.2.jar \ --table-type COPY_ON_WRITE \ --source-class org.apache.hudi.utilities.sources.JsonDFSSource \ --target-base-path /tmp/hudi-decimal-repro/table \ --target-table table \ --props /tmp/hudi-decimal-repro/decimal-repro.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
Expected Results
Command runs successfully, data is ingested successfully into /tmp/hudi-decimal-repro/table, some files exist under /tmp/hudi-decimal-repro/table/2018/08/31/.
Actual Results
Command fails with exception, no data is ingsted into the table. Table left with a hanging commit at the requested state.
Logs of the attempted run are attached as spark.log
Additional Information
This issue does not appear to exist in versions 0.12.2 through 0.14.0 based on my own testing. It does affect all of 0.14.1, 0.15.0, 1.0.0-beta1, and 1.0.0-beta2 releases.