Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-39191

KafkaDataConsumer not support transional producer

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.4.5
    • None
    • Java API
    • None

    Description

      When using transactions, Kafka insert "control batches" in the logs to indicate if messages were part of a transaction.

      These batches are also assigned offsets, so when I only sent a single record but the offsets increasing by 2.

      but KafkaDataConsumer assume that kafka offset only increase one,

      so it cause follow Exception:

      ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8ANTLR Runtime version 4.7 used for parser compilation does not match the current runtime version 4.8ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8ANTLR Runtime version 4.7 used for parser compilation does not match the current runtime version 4.8Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 101.0 failed 4 times, most recent failure: Lost task 8.3 in stage 101.0 (TID 2987, executor 1): java.lang.IllegalArgumentException: requirement failed: Got wrong record for spark-executor-source_from_kafka topic-3 even after seeking to offset 1071229367 got offset 1071229368 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
          at scala.Predef$.require(Predef.scala:224)
          at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)
          at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
          at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
          at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
          at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
          at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
          at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
          at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
          at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
          at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
          at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
          at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
          at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:634)
          at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$2$$anon$1.next(InMemoryRelation.scala:116)
          at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$2$$anon$1.next(InMemoryRelation.scala:108)
          at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
          at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1107)
          at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1098)
          at org.apache.spark.storage.BlockManager.a(BlockManager.scala:1033)
          at org.apache.spark.storage.BlockManager.a(BlockManager.scala:1098)
          at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:824)
          at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
          at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
          at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
          at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
          at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
          at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
          at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
          at org.apache.spark.scheduler.Task.run(Task.scala:110)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            SilkyAlex SilkyAlex
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: