Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4815 Idempotent/transactional Producer (KIP-98)
  3. KAFKA-5355

Broker returns messages beyond "latest stable offset" to transactional consumer in read_committed mode

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 0.11.0.0
    • 0.11.0.0
    • core

    Description

      This issue is exposed by the new Streams EOS integration test.

      Streams has two tasks (ie, two producers with pid 0 and 2000) both writing to output topic output with one partition (replication factor 1).

      The test uses an transactional consumer with group.id=readCommitted to read the data from output topic. When it read the data, each producer has committed 10 records (one producer write messages with key=0 and the other with key=1). Furthermore, each producer has an open transaction and 5 uncommitted records written.

      The test fails, as we expect to see 10 records per key, but we get 15 for key=1:

      java.lang.AssertionError: 
      Expected: <[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), KeyValue(1, 36), KeyValue(1, 45)]>
           but: was <[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), KeyValue(1, 36), KeyValue(1, 45), KeyValue(1, 55), KeyValue(1, 66), KeyValue(1, 78), KeyValue(1, 91), KeyValue(1, 105)]>
      

      Dumping the segment shows, that there are two commit markers (one for each producer) for the first 10 messages written. Furthermore, there are 5 pending records. Thus, "latest stable offset" should be 21 (20 messages plus 2 commit markers) and not data should be returned beyond this offset.

      Dumped Log Segment output-0

      Starting offset: 0
      baseOffset: 0 lastOffset: 9 baseSequence: 0 lastSequence: 9 producerId: 0 producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 0 CreateTime: 1496255947332 isvalid: true size: 291 magic: 2 compresscodec: NONE crc: 600535135
      baseOffset: 10 lastOffset: 10 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 291 CreateTime: 1496256005429 isvalid: true size: 78 magic: 2 compresscodec: NONE crc: 3458060752
      baseOffset: 11 lastOffset: 20 baseSequence: 0 lastSequence: 9 producerId: 2000 producerEpoch: 2 partitionLeaderEpoch: 0 isTransactional: true position: 369 CreateTime: 1496255947322 isvalid: true size: 291 magic: 2 compresscodec: NONE crc: 3392915713
      baseOffset: 21 lastOffset: 25 baseSequence: 10 lastSequence: 14 producerId: 0 producerEpoch: 6 partitionLeaderEpoch: 0 isTransactional: true position: 660 CreateTime: 1496255947342 isvalid: true size: 176 magic: 2 compresscodec: NONE crc: 3513911368
      baseOffset: 26 lastOffset: 26 baseSequence: -1 lastSequence: -1 producerId: 2000 producerEpoch: 2 partitionLeaderEpoch: 0 isTransactional: true position: 836 CreateTime: 1496256011784 isvalid: true size: 78 magic: 2 compresscodec: NONE crc: 1619151485
      

      Dump with --deep-iteration

      Starting offset: 0
      offset: 0 position: 0 CreateTime: 1496255947323 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 0 headerKeys: [] key: 1 payload: 0
      offset: 1 position: 0 CreateTime: 1496255947324 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 1 headerKeys: [] key: 1 payload: 1
      offset: 2 position: 0 CreateTime: 1496255947325 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 2 headerKeys: [] key: 1 payload: 3
      offset: 3 position: 0 CreateTime: 1496255947326 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 3 headerKeys: [] key: 1 payload: 6
      offset: 4 position: 0 CreateTime: 1496255947327 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 4 headerKeys: [] key: 1 payload: 10
      offset: 5 position: 0 CreateTime: 1496255947328 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 5 headerKeys: [] key: 1 payload: 15
      offset: 6 position: 0 CreateTime: 1496255947329 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 6 headerKeys: [] key: 1 payload: 21
      offset: 7 position: 0 CreateTime: 1496255947330 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 7 headerKeys: [] key: 1 payload: 28
      offset: 8 position: 0 CreateTime: 1496255947331 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 8 headerKeys: [] key: 1 payload: 36
      offset: 9 position: 0 CreateTime: 1496255947332 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 600535135 sequence: 9 headerKeys: [] key: 1 payload: 45
      offset: 10 position: 291 CreateTime: 1496256005429 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE crc: 3458060752 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
      offset: 11 position: 369 CreateTime: 1496255947313 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 0 headerKeys: [] key: 0 payload: 0
      offset: 12 position: 369 CreateTime: 1496255947314 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 1 headerKeys: [] key: 0 payload: 1
      offset: 13 position: 369 CreateTime: 1496255947315 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 2 headerKeys: [] key: 0 payload: 3
      offset: 14 position: 369 CreateTime: 1496255947316 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 3 headerKeys: [] key: 0 payload: 6
      offset: 15 position: 369 CreateTime: 1496255947317 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 4 headerKeys: [] key: 0 payload: 10
      offset: 16 position: 369 CreateTime: 1496255947318 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 5 headerKeys: [] key: 0 payload: 15
      offset: 17 position: 369 CreateTime: 1496255947319 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 6 headerKeys: [] key: 0 payload: 21
      offset: 18 position: 369 CreateTime: 1496255947320 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 7 headerKeys: [] key: 0 payload: 28
      offset: 19 position: 369 CreateTime: 1496255947321 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 8 headerKeys: [] key: 0 payload: 36
      offset: 20 position: 369 CreateTime: 1496255947322 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3392915713 sequence: 9 headerKeys: [] key: 0 payload: 45
      offset: 21 position: 660 CreateTime: 1496255947338 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 10 headerKeys: [] key: 1 payload: 55
      offset: 22 position: 660 CreateTime: 1496255947339 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 11 headerKeys: [] key: 1 payload: 66
      offset: 23 position: 660 CreateTime: 1496255947340 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 12 headerKeys: [] key: 1 payload: 78
      offset: 24 position: 660 CreateTime: 1496255947341 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 13 headerKeys: [] key: 1 payload: 91
      offset: 25 position: 660 CreateTime: 1496255947342 isvalid: true keysize: 8 valuesize: 8 magic: 2 compresscodec: NONE crc: 3513911368 sequence: 14 headerKeys: [] key: 1 payload: 105
      offset: 26 position: 836 CreateTime: 1496256011784 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE crc: 1619151485 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0
      

      The client log shows, that the reading and writing happen concurrently.

      [2017-05-31 11:40:11,642] DEBUG Resetting offset for partition outputTopic-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:418)
      [2017-05-31 11:40:11,642] DEBUG Added fetch request for partition outputTopic-0 at offset 0 to node 127.0.0.1:64267 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:793)
      [2017-05-31 11:40:11,642] DEBUG Sending fetch for partitions [outputTopic-0] to broker 127.0.0.1:64267 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:203)
      [2017-05-31 11:40:11,643] DEBUG Added fetch request for partition outputTopic-0 at offset 11 to node 127.0.0.1:64267 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:793)
      [2017-05-31 11:40:11,643] DEBUG Sending fetch for partitions [outputTopic-0] to broker 127.0.0.1:64267 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:203)
      [2017-05-31 11:40:11,673] DEBUG TransactionalId: appId-1-0_1 -- Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=appId-1, coordinatorType=GROUP) (org.apache.kafka.clients.producer.internals.Sender:314)
      [2017-05-31 11:40:11,674] DEBUG TransactionalId appId-1-0_1 -- Received FindCoordinator response with error NONE (org.apache.kafka.clients.producer.internals.TransactionManager:738)
      [2017-05-31 11:40:11,674] DEBUG TransactionalId: appId-1-0_1 -- Sending transactional request (transactionalId=appId-1-0_1, producerId=2000, producerEpoch=2, consumerGroupId=appId-1, offsets={inputTopic-1=CommittedOffset(offset=10, metadata='')}) (org.apache.kafka.clients.producer.internals.Sender:314)
      [2017-05-31 11:40:11,717] DEBUG Dest: 127.0.0.1:64267 (id: 0 rack: null) : producerId: 0, epoch: 6, Assigning sequence for outputTopic-0: 10 (org.apache.kafka.clients.producer.internals.RecordAccumulator:488)
      [2017-05-31 11:40:11,717] DEBUG Dest: 127.0.0.1:64273 (id: 2 rack: null) : producerId: 0, epoch: 6, Assigning sequence for appId-1-store-changelog-0: 10 (org.apache.kafka.clients.producer.internals.RecordAccumulator:488)
      [2017-05-31 11:40:11,718] DEBUG Incremented sequence number for topic-partition appId-1-store-changelog-0 to 15 (org.apache.kafka.clients.producer.internals.Sender:555)
      [2017-05-31 11:40:11,718] DEBUG Incremented sequence number for topic-partition outputTopic-0 to 15 (org.apache.kafka.clients.producer.internals.Sender:555)
      [2017-05-31 11:40:11,780] DEBUG TransactionalId appId-1-0_1 -- Received TxnOffsetCommit response with errors {inputTopic-1=NONE} (org.apache.kafka.clients.producer.internals.TransactionManager:900)
      [2017-05-31 11:40:11,780] DEBUG TransactionalId appId-1-0_1 -- Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION (org.apache.kafka.clients.producer.internals.TransactionManager:427)
      [2017-05-31 11:40:11,780] DEBUG TransactionalId: appId-1-0_1 -- Sending transactional request (transactionalId=appId-1-0_1, producerId=2000, producerEpoch=2, result=COMMIT) (org.apache.kafka.clients.producer.internals.Sender:314)
      [2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Received EndTxn response with error NONE (org.apache.kafka.clients.producer.internals.TransactionManager:792)
      [2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Transition from state COMMITTING_TRANSACTION to READY (org.apache.kafka.clients.producer.internals.TransactionManager:427)
      [2017-05-31 11:40:11,782] DEBUG TransactionalId appId-1-0_1 -- Transition from state READY to IN_TRANSACTION (org.apache.kafka.clients.producer.internals.TransactionManager:427)
      [2017-05-31 11:40:11,782] DEBUG Initiating connection to node 0 at 127.0.0.1:64267. (org.apache.kafka.clients.NetworkClient:707)
      [2017-05-31 11:40:11,782] DEBUG Added fetch request for partition inputTopic-1 at offset 11 to node 127.0.0.1:64267 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:793)
      [2017-05-31 11:40:11,782] DEBUG Sending fetch for partitions [inputTopic-1] to broker 127.0.0.1:64267 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:203)
      [2017-05-31 11:40:11,783] DEBUG Completed connection to node 0.  Ready. (org.apache.kafka.clients.NetworkClient:672)
      [2017-05-31 11:40:11,783] DEBUG TransactionalId: appId-1-0_1 -- Sending transactional request (transactionalId=appId-1-0_1, producerId=2000, producerEpoch=2, partitions=[outputTopic-0, appId-1-store-changelog-1]) (org.apache.kafka.clients.producer.internals.Sender:314)
      [2017-05-31 11:40:11,784] DEBUG Added fetch request for partition inputTopic-1 at offset 15 to node 127.0.0.1:64267 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:793)
      [2017-05-31 11:40:11,784] DEBUG Sending fetch for partitions [inputTopic-1] to broker 127.0.0.1:64267 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:203)
      [2017-05-31 11:40:11,784] DEBUG TransactionalId appId-1-0_1 -- Received AddPartitionsToTxn response with errors {outputTopic-0=CONCURRENT_TRANSACTIONS, appId-1-store-changelog-1=CONCURRENT_TRANSACTIONS} (org.apache.kafka.clients.producer.internals.TransactionManager:658)
      [2017-05-31 11:40:11,784] DEBUG TransactionalId: appId-1-0_1 -- Sending transactional request (transactionalId=appId-1-0_1, producerId=2000, producerEpoch=2, partitions=[outputTopic-0, appId-1-store-changelog-1]) (org.apache.kafka.clients.producer.internals.Sender:314)
      [2017-05-31 11:40:11,784] DEBUG Added fetch request for partition outputTopic-0 at offset 27 to node 127.0.0.1:64267 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:793)
      [2017-05-31 11:40:11,785] DEBUG Sending fetch for partitions [outputTopic-0] to broker 127.0.0.1:64267 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:203)
      

      (full log attached)

      Attachments

        1. test.log
          415 kB
          Matthias J. Sax

        Issue Links

          Activity

            People

              hachikuji Jason Gustafson
              mjsax Matthias J. Sax
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: