Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5795 Make the idempotent producer the default producer setting
  3. KAFKA-5494

Idempotent producer should not require max.in.flight.requests.per.connection=1

Attach filesAttach ScreenshotVotersWatch issueWatchersLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments


    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.0.0
    • None


      Currently, the idempotent producer (and hence transactional producer) requires max.in.flight.requests.per.connection=1.

      This was due to simplifying the implementation on the client and server. With some additional work, we can satisfy the idempotent guarantees even with any number of in flight requests. The changes on the client be summarized as follows:

      1. We increment sequence numbers when batches are drained.
      2. If for some reason, a batch fails with a retriable error, we know that all future batches would fail with an out of order sequence exception.
      3. As such, the client should treat some OutOfOrderSequence errors as retriable. In particular, we should maintain the 'last acked sequnece'. If the batch succeeding the last ack'd sequence has an OutOfOrderSequence, that is a fatal error. If a future batch fails with OutOfOrderSequence they should be reenqeued.
      4. With the changes above, the the producer queues should become priority queues ordered by the sequence numbers.
      5. The partition is not ready unless the front of the queue has the next expected sequence.

      With the changes above, we would get the benefits of multiple inflights in normal cases. When there are failures, we automatically constrain to a single inflight until we get back in sequence.

      With multiple inflights, we now have the possibility of getting duplicates for batches other than the last appended batch. In order to return the record metadata (including offset) of the duplicates inside the log, we would require a log scan at the tail to get the metadata at the tail. This can be optimized by caching the metadata for the last 'n' batches. For instance, if the default max.inflight is 5, we could cache the record metadata of the last 5 batches, and fall back to a scan if the duplicate is not within those 5.



          This comment will be Viewable by All Users Viewable by All Users


            apurva Apurva Mehta
            apurva Apurva Mehta
            1 Vote for this issue
            13 Start watching this issue




                Issue deployment