Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16699

Have Streams treat InvalidPidMappingException like a ProducerFencedException

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Patch Available
    • Major
    • Resolution: Unresolved
    • None
    • None
    • streams
    • None

    Description

      KStreams is able to handle the ProducerFenced (among other errors) cleanly. It does this by closing the task dirty and triggering a rebalance amongst the worker threads to rejoin the group. The producer is also recreated. Due to how streams works (writing to and reading from various topics), the application is able to figure out the last thing the fenced producer completed and continue from there.

      KStreams EOS V2 also trusts that any open transaction (including those whose producer is fenced) will be aborted by the server. This is a key factor in how it is able to operate. In EOS V1, the new InitProducerId fences and aborts the previous transaction. In either case, we are able to reason about the last valid state from the fenced producer and how to proceed.

      InvalidPidMappingException ≈ ProducerFenced

      I argue that InvalidPidMappingException can be handled in the same way. Let me explain why.

      There are two cases we see this error:

      1.  
         {{txnManager.getTransactionState(transactionalId).flatMap

        Unknown macro: { case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)}

        }

      2.  
         if (txnMetadata.producerId != producerId) Left(Errors.INVALID_PRODUCER_ID_MAPPING)

      Case 1

      We are missing a value in the transactional state map for the transactional ID. Under normal operations, this is only possible when the transactional ID expires via the mechanism described above after transactional.id.expiration.ms of inactivity. In this case, there is no state that needs to be reconciled. It is safe to just rebalance and rejoin the group with a new producer. We probably don’t even need to close the task dirty, but it doesn’t hurt to do so.

      Case 2

      This is a bit more interesting. It says that we have transactional state, but the producer ID in the request does not match the producer ID associated with the transactional ID on the broker. How can this happen?

      It is possible that a new producer instance B with the same transactional ID was created after the transactional state expired for instance A. Given there is no state on the server when B joins, it will get a totally new producer ID. If the original producer A comes back, it will have state for this transactional ID but the wrong producer ID.

      In this case, the old producer ID is fenced, it’s just the normal epoch-based fencing logic doesn’t apply. We can treat it the same however.

      Summary

      As described in the cases above, any time we encounter the InvalidPidMapping during normal operation, the previous producer was either finished with its operations or was fenced. Thus, it is safe to close the dirty and rebalance + rejoin the group just as we do with the ProducerFenced exception.

      Attachments

        Issue Links

          Activity

            People

              wcarlson5 Walker Carlson
              wcarlson5 Walker Carlson
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: