Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12207

Do not maintain list of latest producer append information

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments


    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • core


      For each producerId writing to each partition, we maintain a list of the 5 most recent appended sequence numbers and the corresponding offsets in the log. If a producer fails to receive a successful response and retries the Produce request, then we can still return the offset of the successful append, which is returned to the user inside `RecordMetadata`. (Note that the limit of 5 most recent appends is where we derive the limit on the max number of inflight requests that the producer is allowed when idempotence is enabled.)

      This is only a "best-effort" attempt to return the offset of the append. For example, we do not populate the full list of recent appends when the log is reloaded. Only the latest sequence/offset are reloaded from the snapshot. If we receive a duplicate and we do not have the offset, then the broker currently handles this by returning OUT_OF_ORDER_SEQUENCE.

      In fact, we have a separate error DUPLICATE_SEQUENCE_NUMBER which was intended to handle this case and the producer already checks for it. If the producer sees this error in the response, then the `send` is considered successful, but the producer returns -1 as both the offset and timestamp inside `RecordMetadata`.

      The reason we never implemented this on the broker is probably because we allow the sequence numbers of the producer to wrap around after reaching Int.MaxValue. What we considered in the past is fixing a number like 1000 and requiring that the sequence be within that range to be considered a duplicate. A better solution going forward is to let the producer bump the epoch when the sequence hits Int.MaxValue, but we still have to allow sequence numbers to wrap for compatibility.

      Given the loose guarantees that we already have here, I'm considering whether the additional bookkeeping and the required memory are worth preserving. As an alternative, we could consider the following:

      1. The broker will only maintain the latest sequence/offset for each producerId
      2. We will return DUPLICATE_SEQUENCE_NUMBER for any sequence that is within 1000 of the latest sequence (accounting for overflow).
      3. Instead of wrapping around sequence numbers, the producer will bump the epoch if possible. It's worth noting that the idempotent producer can freely bump the epoch, so the only time we should ever need to wrap the sequence is for the transactional producer when it is used on a broker which does not support the `InitProducerId` version which allows epoch bumps.
      4. We can remove the restriction on `max.in.flight.requests.per.connection` and document that if the offset is required in `RecordMetadata`, then the user must set this to 1. Internally, if connecting to an old broker which does not support epoch bumps, then we can restrict the number of inflight requests to 5.

      The benefit in the end is that we can reduce the memory usage for producer state and the complexity to manage that state. It also gives us a path to removing the annoying config restriction and a better policy for sequence overflow.



          This comment will be Viewable by All Users Viewable by All Users


            Unassigned Unassigned
            hachikuji Jason Gustafson




                Issue deployment