Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13404

Kafka sink connectors do not commit offset correctly if messages are produced in transaction

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.6.1
    • None
    • connect
    • None

    Description

      The Kafka sink connectors don't commit offset to the latest log-end offset if the messages are produced in a transaction.

      From the code of WorkerSinkTask.java], we found that the sink connector gets offset from messages and commits it to Kafka after the messages are processed successfully. But for messages produced in the transaction, there are additional record control batches that are used to indicate the transaction is successful or aborted.

       

      You can reproduce it by running `connect-file-sink` with the following properties:

      /opt/kafka/bin/connect-standalone.sh /connect-standalone.properties /connect-file-sink.properties
      # connect-standalone.properties
      bootstrap.servers=localhost:9092
      
      key.converter=org.apache.kafka.connect.storage.StringConverter
      value.converter=org.apache.kafka.connect.storage.StringConverter
      
      key.converter.schemas.enable=true
      value.converter.schemas.enable=true
      
      # for testing
      offset.flush.interval.ms=10000
      
      consumer.isolation.level=read_committed
      consumer.auto.offset.reset=none
      
      # connect-file-sink.properties
      name=local-file-sink
      connector.class=FileStreamSink
      tasks.max=1
      file=/tmp/test.sink.txt
      topics=test

      And use the attached Java producer (Main.scala to produce 10 messages to the `test` topic in a transaction.

      You can see that the topic log-end offset is 11 now and the last record in the segment file is control batches. But the consumer group offset is still in 10. (If the record is deleted by topic retention, you will get OffsetOutOfRange exception after restart the connector)

      bash-5.1# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --group connect-local-file-sink --describe 
      GROUP                   TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                               HOST            CLIENT-ID                                        
      connect-local-file-sink test            0          10              11              1               connector-consumer-local-file-sink-0-10777adb-72c2-4fd3-8773-4f5a0498903d /172.21.0.3     connector-consumer-local-file-sink-0
      
      bash-5.1# /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/test-0/00000000000000000000.log --print-data-log
      Dumping /kafka/test-0/00000000000000000000.log
      Starting offset: 0
      baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9 producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1634805907230 size: 208 magic: 2 compresscodec: GZIP crc: 2170304005 isvalid: tru GZIP crc: 2170304005 isvalid: true
      | offset: 0 CreateTime: 1634805907210 keysize: -1 valuesize: 39 sequence: 0 headerKeys: [] payload: {"value": "banana", "time": 1634805907}
      | offset: 1 CreateTime: 1634805907230 keysize: -1 valuesize: 39 sequence: 1 headerKeys: [] payload: {"value": "banana", "time": 1634805907}
      | offset: 2 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 2 headerKeys: [] payload: {"value": "ice", "time": 1634805907}
      | offset: 3 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 3 headerKeys: [] payload: {"value": "apple", "time": 1634805907}
      | offset: 4 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 4 headerKeys: [] payload: {"value": "home", "time": 1634805907}
      | offset: 5 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 5 headerKeys: [] payload: {"value": "juice", "time": 1634805907}
      | offset: 6 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 6 headerKeys: [] payload: {"value": "cat", "time": 1634805907}
      | offset: 7 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 7 headerKeys: [] payload: {"value": "cat", "time": 1634805907}
      | offset: 8 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 8 headerKeys: [] payload: {"value": "girl", "time": 1634805907}
      | offset: 9 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 9 headerKeys: [] payload: {"value": "cat", "time": 1634805907}
      baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1 producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 208 CreateTime: 1634805908149 size: 78 magic: 2 compresscodec: NONE crc: 1662003889 isvalid: ec: NONE crc: 1662003889 isvalid: true
      | offset: 10 CreateTime: 1634805908149 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
      

       

      I think we should use KafkaConsumer.position() to get the correct offset instead of offsets in messages. I will create a PR for that later.

       

      Attachments

        1. Main.scala
          3 kB
          Yu-Jhe Li

        Issue Links

          Activity

            People

              anupamaggarwal Anupam Aggarwal
              yujhe.li Yu-Jhe Li
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: