Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-6485

Invalid json input record crashes HoodieDeltaStreamer

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.13.1, 0.12.3
    • None
    • deltastreamer
    • None

    Description

      With a HoodieDeltaStreamer configured to decode json records, a non-json record causes a failure with no way to skip it or move past. ClickHouse KafkaEngine solves this with an optional "skip invalid records" mode, which seems like it would be helpful here as well.

      It's not clear to me if `--commit-on-errors` is supposed to solve for this, but it appears not to in practice.

      On failure, the following stack trace is thrown and the process exits:

      23/07/03 14:49:23 INFO DAGScheduler: ShuffleMapStage 2 (mapToPair at HoodieJavaRDD.java:135) failed in 2144.592 s due to Job aborted due to stage failure: Task 14 in stage 2.0 failed 1 times, most recent f
      ailure: Lost task 14.0 in stage 2.0 (TID 16) (executor driver): org.apache.hudi.exception.HoodieIOException: Unrecognized token 'testvalue': was expecting (JSON String, Number, Array, Obje
      ct or token 'null', 'true' or 'false')                                                                                                                                                                       
       at [Source: (String)"testvalue"; line: 1, column: 10]                                                                                                                                                       
              at org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:96)                                                                                                                 
              at org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:87)                                                                                                           
              at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)                                                                                                         
              at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)                                                                                                                                       
              at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)                                                                                                                                       
              at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)                                                                                                                                       
              at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)                                                                                                               
              at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)                                                                                                                 
              at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
              at org.apache.spark.scheduler.Task.run(Task.scala:136)
              at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
              at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'testvalue': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
       at [Source: (String)"testvalue"; line: 1, column: 10]
              at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
              at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
              at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2961)
              at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2939)
              at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._matchToken(ReaderBasedJsonParser.java:2713)
              at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._matchTrue(ReaderBasedJsonParser.java:2667)
              at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:767)
              at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4761)
              at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4667)
              at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
              at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)
              at org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:93)
              ... 17 more

      Attachments

        Activity

          People

            Unassigned Unassigned
            easel Erik LaBianca
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: