Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29639

Spark Kafka connector 0.10.0 generates incorrect end offsets for micro batch

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Abandoned
    • 2.4.0
    • None
    • None

    Description

      We have been running a Spark structured job on production for more than a week now. Put simply, it reads data from source Kafka topics (with 4 partitions) and writes to another kafka topic. Everything has been running fine until the job started failing with the following error:

       

      Driver stacktrace:
       === Streaming Query ===
       Identifier: MetricComputer [id = af26bab1-0d89-4766-934e-ad5752d6bb08, runId = 613a21ad-86e3-4781-891b-17d92c18954a]
       Current Committed Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: {"kafka-topic-name":
      {"2":10458347,"1":10460151,"3":10475678,"0":9809564}
      }}
       Current Available Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: {"kafka-topic-name":
      {"2":10458347,"1":10460151,"3":10475678,"0":10509527}
      }}
      Current State: ACTIVE
       Thread State: RUNNABLE
      <-- Removed Logical plan -->
       Some data may have been lost because they are not available in Kafka any more; either the
       data was aged out by Kafka or the topic may have been deleted before all the data in the
       topic was processed. If you don't want your streaming query to fail on such cases, set the
       source option "failOnDataLoss" to "false".

      Configuration:

      Spark 2.4.0
      Spark-sql-kafka 0.10

      Looking at the Spark structured streaming query progress logs, it seems like the endOffsets computed for the next batch was actually smaller than the starting offset:

      Microbatch Trigger 1:

      2019/10/26 23:53:51 INFO utils.Logging[26]: 2019-10-26 23:53:51.767 : ( : Query {
        "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b",
        "runId" : "2d20d633-2768-446c-845b-893243361422",
        "name" : "StreamingProcessorName",
        "timestamp" : "2019-10-26T23:53:51.741Z",
        "batchId" : 2145898,
        "numInputRows" : 0,
        "inputRowsPerSecond" : 0.0,
        "processedRowsPerSecond" : 0.0,
        "durationMs" : {
          "getEndOffset" : 0,
          "setOffsetRange" : 9,
          "triggerExecution" : 9
        },
        "stateOperators" : [ ],
        "sources" : [ {
          "description" : "KafkaV2[Subscribe[kafka-topic-name]]",
          "startOffset" : {
            "kafka-topic-name" : {
              "2" : 10452513,
              "1" : 10454326,
              "3" : 10469196,
              "0" : 10503762
            }
          },
          "endOffset" : {
            "kafka-topic-name" : {
              "2" : 10452513,
              "1" : 10454326,
              "3" : 10469196,
              "0" : 10503762
            }
          },
          "numInputRows" : 0,
          "inputRowsPerSecond" : 0.0,
          "processedRowsPerSecond" : 0.0
        } ],
        "sink" : {
          "description" : "ForeachBatchSink"
        }
      } in progress

      Next micro batch trigger:

      2019/10/26 23:53:53 INFO utils.Logging[26]: 2019-10-26 23:53:53.951 : ( : Query {
        "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b",
        "runId" : "2d20d633-2768-446c-845b-893243361422",
        "name" : "StreamingProcessorName",
        "timestamp" : "2019-10-26T23:53:52.907Z",
        "batchId" : 2145898,
        "numInputRows" : 0,
        "inputRowsPerSecond" : 0.0,
        "processedRowsPerSecond" : 0.0,
        "durationMs" : {
          "addBatch" : 350,
          "getBatch" : 4,
          "getEndOffset" : 0,
          "queryPlanning" : 102,
          "setOffsetRange" : 24,
          "triggerExecution" : 1043,
          "walCommit" : 349
        },
        "stateOperators" : [ ],
        "sources" : [ {
          "description" : "KafkaV2[Subscribe[kafka-topic-name]]",
          "startOffset" : {
            "kafka-topic-name" : {
              "2" : 10452513,
              "1" : 10454326,
              "3" : 10469196,
              "0" : 10503762
            }
          },
          "endOffset" : {
            "kafka-topic-name" : {
              "2" : 10452513,
              "1" : 10454326,
              "3" : 9773098,
              "0" : 10503762
            }
          },
          "numInputRows" : 0,
          "inputRowsPerSecond" : 0.0,
          "processedRowsPerSecond" : 0.0
        } ],
        "sink" : {
          "description" : "ForeachBatchSink"
        }
      } in progress

      Notice that for partition 3 of the kafka topic, the endOffsets are actually smaller than the starting offsets!

      Checked the HDFS checkpoint dir and the checkpointed offsets look fine and point to the last committed offsets
      Why is the end offset for a partition being computed to a smaller value?

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            abhinavchdhry Abhinav Choudhury
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: