Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7636

Allow consumer to update maxPollRecords value



    • Type: Improvement
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 2.0.1
    • Fix Version/s: None
    • Component/s: clients



      We have two use cases where we would need to change the max.poll.records parameter on the fly :

      1. We offer a REST API to get 'feedbacks'. This API takes into account a parameter 'count'.
      The system was previously based on cassandra. It is now based on kafka and 'feedbacks' are stored into kafka topics.
      To be compliant with the legacy interface contract, we would like to be able to change the max.poll.records on the fly to take into account this 'count' parameter.

      2. We receive 'notification requests' related to a 'sender' via a REST API. We store those requests into topics (by sender).
      Each sender is associated with a weight. Here is the algorithm that process the requests :

          1. At each iteration, we process at max n records (n configurable) for the whole bunch of requests. For this example, let's say 100.
          2. We compute the max poll records for each sender. Let's say we have 3 senders with the following weight 2, 1, 1. Hence 50 records max for the first one, 25 for the others two.
          3. We consume the topics one after the other. We would like to reallocate some capacity to remaining consumers if the max.poll.records is not reached for the current consumer. Let'say at each iteration we make the following synchronous calls :
                  sender1Consumer.poll() with computed max.poll.records 50
                  sender2Consumer.poll() with computed max.poll.records 25
                  sender3Consumer.poll() with computed max.poll.records 25
             If the first call returns only 10 records, we would like to reallocate the 40 "spare" records to the other consumers, 20 for each for instance (or another strategy). We would make the following calls instead :
                  sender2Consumer.poll() with updated max.poll.records 45
                  sender3Consumer.poll() with updated max.poll.records 45


      For that requirement we also need to change the max.poll.records on the fly.


      PR: https://github.com/apache/kafka/pull/5919



          Issue Links



              • Assignee:
                Kcirtap7 Kcirtap Seven
              • Votes:
                0 Vote for this issue
                2 Start watching this issue


                • Created: