Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.11.0.0
    • Fix Version/s: 1.0.0
    • Component/s: None
    • Labels:

      Description

      Currently, the idempotent producer (and hence transactional producer) requires max.in.flight.requests.per.connection=1.

      This was due to simplifying the implementation on the client and server. With some additional work, we can satisfy the idempotent guarantees even with any number of in flight requests. The changes on the client be summarized as follows:

      1. We increment sequence numbers when batches are drained.
      2. If for some reason, a batch fails with a retriable error, we know that all future batches would fail with an out of order sequence exception.
      3. As such, the client should treat some OutOfOrderSequence errors as retriable. In particular, we should maintain the 'last acked sequnece'. If the batch succeeding the last ack'd sequence has an OutOfOrderSequence, that is a fatal error. If a future batch fails with OutOfOrderSequence they should be reenqeued.
      4. With the changes above, the the producer queues should become priority queues ordered by the sequence numbers.
      5. The partition is not ready unless the front of the queue has the next expected sequence.

      With the changes above, we would get the benefits of multiple inflights in normal cases. When there are failures, we automatically constrain to a single inflight until we get back in sequence.

      With multiple inflights, we now have the possibility of getting duplicates for batches other than the last appended batch. In order to return the record metadata (including offset) of the duplicates inside the log, we would require a log scan at the tail to get the metadata at the tail. This can be optimized by caching the metadata for the last 'n' batches. For instance, if the default max.inflight is 5, we could cache the record metadata of the last 5 batches, and fall back to a scan if the duplicate is not within those 5.

        Issue Links

          Activity

          Hide
          apurva Apurva Mehta added a comment -
          Show
          apurva Apurva Mehta added a comment - cc Jason Gustafson
          Hide
          jkreps Jay Kreps added a comment - - edited

          Yeah I think it'd be a fantastic outcome to just be able to eliminate a lot of these configs as things you need to ever think about. Ideally by default:

          1. idempotent=true
          2. retries=infinite
          3. acks=all
          4. max.inflight=2 (or whatever)

          Various chunks of work to make each of these possible:

          1. I think enabling idempotence and retries=infinite by default is just a comfort thing since this is new
          2. For max.inflight I vaguely recall that the reason i added that config was because it helper performance a small amount. With only 1 you end up potentially leaving the server unused once the response is sent but the client hasn't yet sent its next request. But it'd be good to sanity check that max.inflight > 1 actually helps perf.
          3. acks=all Currently I believe this is about 2x slower than acks=1. At least this was the case when we did producer benchmarking a few years back, but no one has ever actually looked at why that is the case. The knee-jerk reaction is that "of course acks=all is slower since it's waiting for replication". But if you think about it that isn't obviously the case. Intuitively, yes, the requests take longer to process, but this should just lead to more batching, which should be able to make up for it. So there is a 50% chance that this is just some perf issue with purgatory or with the client and fixing that could enable us to just default to the stronger semantics (which would be great).
          Show
          jkreps Jay Kreps added a comment - - edited Yeah I think it'd be a fantastic outcome to just be able to eliminate a lot of these configs as things you need to ever think about. Ideally by default: idempotent=true retries=infinite acks=all max.inflight=2 (or whatever) Various chunks of work to make each of these possible: I think enabling idempotence and retries=infinite by default is just a comfort thing since this is new For max.inflight I vaguely recall that the reason i added that config was because it helper performance a small amount. With only 1 you end up potentially leaving the server unused once the response is sent but the client hasn't yet sent its next request. But it'd be good to sanity check that max.inflight > 1 actually helps perf. acks=all Currently I believe this is about 2x slower than acks=1. At least this was the case when we did producer benchmarking a few years back, but no one has ever actually looked at why that is the case. The knee-jerk reaction is that "of course acks=all is slower since it's waiting for replication". But if you think about it that isn't obviously the case. Intuitively, yes, the requests take longer to process, but this should just lead to more batching, which should be able to make up for it. So there is a 50% chance that this is just some perf issue with purgatory or with the client and fixing that could enable us to just default to the stronger semantics (which would be great).
          Hide
          ijuma Ismael Juma added a comment -

          Yes, it would be indeed awesome if people didn't have to think about all these settings. If that is not possible, it would still be pretty good if most people didn't have to think about them. That would leave a minority of power users that would tweak things to satisfy their unusual needs.

          Show
          ijuma Ismael Juma added a comment - Yes, it would be indeed awesome if people didn't have to think about all these settings. If that is not possible, it would still be pretty good if most people didn't have to think about them. That would leave a minority of power users that would tweak things to satisfy their unusual needs.
          Hide
          apurva Apurva Mehta added a comment -

          I created a separate JIRA to track the progress on making acks=all the default. You can find it here: https://issues.apache.org/jira/browse/KAFKA-5640

          Show
          apurva Apurva Mehta added a comment - I created a separate JIRA to track the progress on making acks=all the default. You can find it here: https://issues.apache.org/jira/browse/KAFKA-5640
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user apurvam opened a pull request:

          https://github.com/apache/kafka/pull/3743

          KAFKA-5494: enable idempotence with max.in.flight.requests.per.connection > 1

          Here we introduce client and broker changes to support multiple inflight requests while still guaranteeing idempotence. Two major problems to be solved:

          1. Sequence number management on the client when there are request failures. When a batch fails, future inflight batches will also fail with `OutOfOrderSequenceException`. This must be handled on the client with intelligent sequence reassignment. We must also deal with the fatal failure of some batch: the future batches must get different sequence numbers when the come back.
          2. On the broker, when we have multiple inflights, we can get duplicates of multiple old batches. With this patch, we retain the record metadata for 5 older batches.

          I have added `TODO(reviewers)` comments for specific decisions in the code which are worth discussing.

          TODO:
          1. Add more unit tests, especially for loading different snapshot versions correctly, more client side unit tests, more broker side tests to validate that we are caching the correct number of batches (some of this is already there).
          2. Update the system tests to check for ordering.
          3. Run a tight loop of system tests.
          4. Add comments about the assumptions made around the network client semantics of send/receive.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/apurvam/kafka KAFKA-5494-increase-max-in-flight-for-idempotent-producer

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/3743.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3743


          commit 005eee527ab425d8e3d8678aad4b5305cde6ca08
          Author: Apurva Mehta <apurva@confluent.io>
          Date: 2017-08-12T00:25:06Z

          Initial commit of client side changes with some tests

          commit 63bf074a38ec3efef728863081805a36d9111038
          Author: Apurva Mehta <apurva@confluent.io>
          Date: 2017-08-17T23:49:10Z

          Implemented broker side changes to cache extra metadata.

          Todo:
          1) Write more unit tests.
          2) Handle deletion / retention / cleaning correctly.

          commit 1ad49f30f03ff665f5657680cbcc5e045210ce45
          Author: Apurva Mehta <apurva@confluent.io>
          Date: 2017-08-23T00:10:39Z

          Change the client side code so that the sequence numbers are assigned
          and incremented during drain. If a batch is retried, it's sequence
          number is unset during the completion handler. If the first inflight
          batch returns an error, the next sequence to assign is reset to the last
          ack'd sequence + 1.

          commit d9b86b7cb8e7001a7d5fc42a2ec061ebd0332a6a
          Author: Apurva Mehta <apurva@confluent.io>
          Date: 2017-08-24T01:33:54Z

          WIP

          commit 9ff885fe6db7172d28ea8fe406972a7763c0a49d
          Author: Apurva Mehta <apurva@confluent.io>
          Date: 2017-08-25T06:23:50Z

          Implemented log cleaning functionality with tests

          commit 5508a194c74a8946a8451c01814324e6ba788cfe
          Author: Apurva Mehta <apurva@confluent.io>
          Date: 2017-08-25T19:27:03Z

          Fix merge issues aftre rebasing onto trunk


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user apurvam opened a pull request: https://github.com/apache/kafka/pull/3743 KAFKA-5494 : enable idempotence with max.in.flight.requests.per.connection > 1 Here we introduce client and broker changes to support multiple inflight requests while still guaranteeing idempotence. Two major problems to be solved: 1. Sequence number management on the client when there are request failures. When a batch fails, future inflight batches will also fail with `OutOfOrderSequenceException`. This must be handled on the client with intelligent sequence reassignment. We must also deal with the fatal failure of some batch: the future batches must get different sequence numbers when the come back. 2. On the broker, when we have multiple inflights, we can get duplicates of multiple old batches. With this patch, we retain the record metadata for 5 older batches. I have added `TODO(reviewers)` comments for specific decisions in the code which are worth discussing. TODO: 1. Add more unit tests, especially for loading different snapshot versions correctly, more client side unit tests, more broker side tests to validate that we are caching the correct number of batches (some of this is already there). 2. Update the system tests to check for ordering. 3. Run a tight loop of system tests. 4. Add comments about the assumptions made around the network client semantics of send/receive. You can merge this pull request into a Git repository by running: $ git pull https://github.com/apurvam/kafka KAFKA-5494 -increase-max-in-flight-for-idempotent-producer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3743.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3743 commit 005eee527ab425d8e3d8678aad4b5305cde6ca08 Author: Apurva Mehta <apurva@confluent.io> Date: 2017-08-12T00:25:06Z Initial commit of client side changes with some tests commit 63bf074a38ec3efef728863081805a36d9111038 Author: Apurva Mehta <apurva@confluent.io> Date: 2017-08-17T23:49:10Z Implemented broker side changes to cache extra metadata. Todo: 1) Write more unit tests. 2) Handle deletion / retention / cleaning correctly. commit 1ad49f30f03ff665f5657680cbcc5e045210ce45 Author: Apurva Mehta <apurva@confluent.io> Date: 2017-08-23T00:10:39Z Change the client side code so that the sequence numbers are assigned and incremented during drain. If a batch is retried, it's sequence number is unset during the completion handler. If the first inflight batch returns an error, the next sequence to assign is reset to the last ack'd sequence + 1. commit d9b86b7cb8e7001a7d5fc42a2ec061ebd0332a6a Author: Apurva Mehta <apurva@confluent.io> Date: 2017-08-24T01:33:54Z WIP commit 9ff885fe6db7172d28ea8fe406972a7763c0a49d Author: Apurva Mehta <apurva@confluent.io> Date: 2017-08-25T06:23:50Z Implemented log cleaning functionality with tests commit 5508a194c74a8946a8451c01814324e6ba788cfe Author: Apurva Mehta <apurva@confluent.io> Date: 2017-08-25T19:27:03Z Fix merge issues aftre rebasing onto trunk
          Hide
          apurva Apurva Mehta added a comment -

          I wrote up a short description of the solution's design here: https://docs.google.com/document/d/1EBt5rDfsvpK6mAPOOWjxa9vY0hJ0s9Jx9Wpwciy0aVo/edit

          Show
          apurva Apurva Mehta added a comment - I wrote up a short description of the solution's design here: https://docs.google.com/document/d/1EBt5rDfsvpK6mAPOOWjxa9vY0hJ0s9Jx9Wpwciy0aVo/edit
          Hide
          hachikuji Jason Gustafson added a comment -

          Issue resolved by pull request 3743
          https://github.com/apache/kafka/pull/3743

          Show
          hachikuji Jason Gustafson added a comment - Issue resolved by pull request 3743 https://github.com/apache/kafka/pull/3743
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/kafka/pull/3743

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3743

            People

            • Assignee:
              apurva Apurva Mehta
              Reporter:
              apurva Apurva Mehta
            • Votes:
              1 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development