Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2487

An empty message in Kafka causes a task exception

    XMLWordPrintableJSON

Details

    Description

      Question:

            When I use deltaStreamer to update hive tables in upsert mode from json data in Kafka to HUDi, if the value of the message body in Kafka is null, the task throws an exception.

      Exception description:

      Lost task 0.1 in stage 2.0 (TID 24, node-group-1UtpO.1f562475-6982-4b16-a50d-d19b0ebff950.com, executor 6): org.apache.hudi.exception.HoodieException: The value of tmSmp can not be null
      at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:463)
      at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$d62e16$1(DeltaSync.java:389)
      at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
      at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:196)
      at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
      at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:58)
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
      at org.apache.spark.scheduler.Task.run(Task.scala:123)
      at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:413)
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1551)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)

      The task Settings:

       

      hoodie.datasource.write.precombine.field=tmSmp
      hoodie.datasource.write.recordkey.field=subOrderId,activityId,ticketId
      hoodie.datasource.hive_sync.partition_fields=db,dt
      hoodie.datasource.write.partitionpath.field=db:SIMPLE,dt:SIMPLE
      hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
      hoodie.datasource.hive_sync.enable=true
      hoodie.datasource.meta.sync.enable=true
      hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
      hoodie.datasource.hive_sync.support_timestamp=true
      hoodie.datasource.hive_sync.auto_create_database=true
      hoodie.meta.sync.client.tool.class=org.apache.hudi.hive.HiveSyncTool
      hoodie.datasource.hive_sync.base_file_format=PARQUET
      

       

       

      Spark-submit Script parameter Settings:

       

      --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
      --source-ordering-field tmSmp \
      --table-type MERGE_ON_READ  \
      --target-table ${TABLE_NAME} \
      --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
      --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
      --enable-sync \
      --op UPSERT \
      --continuous \
      

       

       

             So I think some optimizations can be made to prevent task throwing, such as filtering messages with a null value in Kafka.

       

      Attachments

        Issue Links

          Activity

            People

              qianchutao qianchutao
              qianchutao qianchutao
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 24h
                  24h
                  Remaining:
                  Remaining Estimate - 24h
                  24h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified