Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12671

Out of order processing with a transactional producer can lead to a stuck LastStableOffset

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0
    • None
    • None

    Description

      If there is pathological processing of incoming produce requests and EndTxn requests, then the LastStableOffset can get stuck, which will block consuming in READ_COMMITTED mode.

      To transactionally produce, the standard flow is to InitProducerId, and then loop AddPartitionsToTxn -> Produce+ -> EndTxn. The AddPartitionsToTxn is responsible for fencing and adding partitions to a transaction, and the end transaction is responsible for finishing the transaction. Producing itself is mostly uninvolved with the proper fencing / ending flow, but produce requests are required to be after AddPartitionsToTxn and before EndTxn.

      When a ProduceRequest is handled, Kafka uses an internal ProducerStateManager to mildly manage transactions. The ProducerStateManager is completely independent of the TxnCoordinator, and its guarantees are relatively weak. The ProducerStateManager handles two types of "batches" being added: a data batch and a transaction marker. When a data batch is added, a "transaction" is begun and tied to the producer ID that is producing the batch. When a transaction marker is handled, the ProducerStateManager removes the transaction for the producer ID (roughly).

      EndTxn is what triggers transaction markers to be sent to the ProducerStateManager. In essence, EndTxn is the one part of the transactional producer flow that talks across both the TxnCoordinator and the ProducerStateManager.

      If a ProduceRequest is issued before EndTxn, but handled internally in Kafka after EndTxn, then the ProduceRequest will begin a new transaction in the ProducerStateManager. If the client was disconnecting, and the EndTxn was the final request issued, the new transaction created in ProducerStateManager is orphaned and nothing can clean it up. The LastStableOffset then hangs based off of this hung transaction.

      This same problem can be triggered by a produce request that is issued with a transactional ID outside of the context of a transaction at all (no AddPartitionsToTxn). This problem cannot be triggered by producing for so long that the transaction expires; the difference here is that the transaction coordinator bumps the epoch for the producer ID, thus producing again with the old epoch does not work.

      Theoretically, we are supposed have unlimited retries on produce requests, but in the context of wanting to abort everything and shut down, this is not always feasible. As it currently stands, I'm not sure there's a truly safe way to shut down without flushing and receiving responses for every record produced, even if I want to abort everything and quit. The safest approach I can think of is to actually avoid issuing an EndTxn so that instead we rely on Kafka internally to time things out after a period of time.

      For some context, here's my request logs from the client. Note that I write two ProduceRequests, read one, and then issue EndTxn (because I know I want to quit). The second ProduceRequest is read successfully before shutdown, but I ignore the results because I am shutting down. I've taken out logs related to consuming, but the order of the logs is unchanged:

      [INFO] done waiting for unknown topic, metadata was successful; topic: 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765
      [INFO] initializing producer id
      [DEBUG] wrote FindCoordinator v3; err: <nil>
      [DEBUG] read FindCoordinator v3; err: <nil>
      [DEBUG] wrote InitProducerID v4; err: <nil>
      [DEBUG] read InitProducerID v4; err: <nil>
      [INFO] producer id initialization success; id: 1463, epoch: 0
      [DEBUG] wrote AddPartitionsToTxn v2; err: <nil>
      [DEBUG] read AddPartitionsToTxn v2; err: <nil>
      [DEBUG] wrote Produce v8; err: <nil>
      [DEBUG] read Produce v8; err: <nil>
      [DEBUG] produced; to: 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{15589=>19686}]
      [DEBUG] wrote Produce v8; err: <nil>
      [DEBUG] wrote EndTxn v2; err: <nil>
      [DEBUG] read EndTxn v2; err: <nil>
      [DEBUG] read from broker errored, killing connection; addr: localhost:9092, id: 1, successful_reads: 1, err: context canceled
      [DEBUG] read Produce v8; err: <nil>
      [DEBUG] produced; to: 2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765[1{skipped}]
      

      And then from the broker's point of view. Across two brokers, the second ProduceRequest is completed after EndTxn is handled (and after the WriteTxnMarkers request is handled, which is the important one that hooks into the ProducerStateManager):

      /// Broker 3: init producer ID
      [2021-04-15 00:56:40,030] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=kgo, correlationId=3) -- {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,transaction_timeout_ms=60000,producer_id=-1,producer_epoch=-1,_tagged_fields={}},response:{throttle_time_ms=0,error_code=0,producer_id=1463,producer_epoch=0,_tagged_fields={}} from connection 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:2.255,requestQueueTime:0.077,localTime:0.74,remoteTime:0.095,throttleTime:0,responseQueueTime:1.005,sendTime:0.336,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, softwareVersion=0.1.0) (kafka.request.logger)
      
      
      /// Broker 3: add partitions to txn
      [2021-04-15 00:56:40,071] DEBUG Completed request:RequestHeader(apiKey=ADD_PARTITIONS_TO_TXN, apiVersion=2, clientId=kgo, correlationId=4) -- {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,producer_id=1463,producer_epoch=0,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partitions=[1]}]},response:{throttle_time_ms=0,results=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,results=[{partition_index=1,error_code=0}]}]} from connection 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:1.247,requestQueueTime:0.133,localTime:0.71,remoteTime:0.136,throttleTime:0,responseQueueTime:0.087,sendTime:0.178,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, softwareVersion=0.1.0) (kafka.request.logger)
      
      
      /// Broker 2: first produce
      [2021-04-15 00:56:40,223] DEBUG Completed request:RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=kgo, correlationId=1) -- {acks=-1,timeout=30000,numPartitions=1},response:{responses=[{topic=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_responses=[{partition=1,error_code=0,base_offset=15589,log_append_time=-1,log_start_offset=0,record_errors=[],error_message=null}]}],throttle_time_ms=0} from connection 127.0.0.1:9094-127.0.0.1:59022-1639;totalTime:2.705,requestQueueTime:0.055,localTime:2.435,remoteTime:0.058,throttleTime:0,responseQueueTime:0.055,sendTime:0.1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, softwareVersion=0.1.0),temporaryMemoryBytes:324898 (kafka.request.logger)
      
      
      // Broker 3: end txn
      [2021-04-15 00:56:40,350] DEBUG Completed request:RequestHeader(apiKey=END_TXN, apiVersion=2, clientId=kgo, correlationId=5) -- {transactional_id=168e4dfe174060600305d8e998f08e1688bd7f48c7381cf979fff0e8a119f570,producer_id=1463,producer_epoch=0,committed=false},response:{throttle_time_ms=0,error_code=0} from connection 127.0.0.1:9096-127.0.0.1:57450-1557;totalTime:3.484,requestQueueTime:0.052,localTime:0.318,remoteTime:0.06,throttleTime:0,responseQueueTime:2.92,sendTime:0.133,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, softwareVersion=0.1.0) (kafka.request.logger)
      
      
      /// Broker 2: txn markers
      [2021-04-15 00:56:40,357] DEBUG Completed request:RequestHeader(apiKey=WRITE_TXN_MARKERS, apiVersion=0, clientId=broker-3-txn-marker-sender, correlationId=66708) -- {markers=[{producer_id=1463,producer_epoch=0,transaction_result=false,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_indexes=[1]}],coordinator_epoch=0}]},response:{markers=[{producer_id=1463,topics=[{name=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partitions=[{partition_index=1,error_code=0}]}]}]} from connection 127.0.0.1:9094-127.0.0.1:38966-676;totalTime:3.507,requestQueueTime:1.957,localTime:0.34,remoteTime:0.031,throttleTime:0,responseQueueTime:0.324,sendTime:0.853,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=unknown, softwareVersion=unknown) (kafka.request.logger)
      
      
      /// Broker 2: second produce
      [2021-04-15 00:56:40,374] DEBUG Completed request:RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=kgo, correlationId=2) -- {acks=-1,timeout=30000,numPartitions=1},response:{responses=[{topic=2135e281e688406306830c19b0aca37476dd5b0b4b50689672a9244203721765,partition_responses=[{partition=1,error_code=0,base_offset=19687,log_append_time=-1,log_start_offset=0,record_errors=[],error_message=null}]}],throttle_time_ms=0} from connection 127.0.0.1:9094-127.0.0.1:59022-1639;totalTime:4.45,requestQueueTime:0.603,localTime:2.721,remoteTime:0.051,throttleTime:0,responseQueueTime:0.043,sendTime:1.031,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=kgo, softwareVersion=0.1.0),temporaryMemoryBytes:356824 (kafka.request.logger)
      

      I believe that one fix for this would be to only allow transactions to start in the ProducerStateManager if a transaction has actually begun through AddPartitionsToTxn, and to reject produce requests to partitions that have not been added to a txn. An alternative fix would be to just wait for all produce requests to finish before issuing EndTxn, but this seems less desirable when wanting to shut down and abort progress. Another alternative is to avoid issuing EndTxn and to just shutdown, but this also seems undesirable and will block consumers until the transaction timeout expires.

      This may be the cause of KAFKA-5880.

      Attachments

        Activity

          People

            Unassigned Unassigned
            twmb Travis Bischel
            Votes:
            1 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated: