Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
Description
Question:
When I use deltaStreamer to update hive tables in upsert mode from json data in Kafka to HUDi, if the value of the message body in Kafka is null, the task throws an exception.
Exception description:
Lost task 0.1 in stage 2.0 (TID 24, node-group-1UtpO.1f562475-6982-4b16-a50d-d19b0ebff950.com, executor 6): org.apache.hudi.exception.HoodieException: The value of tmSmp can not be null
at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:463)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$d62e16$1(DeltaSync.java:389)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:196)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:58)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:413)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1551)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
The task Settings:
hoodie.datasource.write.precombine.field=tmSmp hoodie.datasource.write.recordkey.field=subOrderId,activityId,ticketId hoodie.datasource.hive_sync.partition_fields=db,dt hoodie.datasource.write.partitionpath.field=db:SIMPLE,dt:SIMPLE hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator hoodie.datasource.hive_sync.enable=true hoodie.datasource.meta.sync.enable=true hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.support_timestamp=true hoodie.datasource.hive_sync.auto_create_database=true hoodie.meta.sync.client.tool.class=org.apache.hudi.hive.HiveSyncTool hoodie.datasource.hive_sync.base_file_format=PARQUET
Spark-submit Script parameter Settings:
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --source-ordering-field tmSmp \ --table-type MERGE_ON_READ \ --target-table ${TABLE_NAME} \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --enable-sync \ --op UPSERT \ --continuous \
So I think some optimizations can be made to prevent task throwing, such as filtering messages with a null value in Kafka.
Attachments
Issue Links
- links to