Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13463

Improvement: KafkaConsumer pause(Collection<TopicPartition> partitions)





      When users use the kafkaConsumer#pause(...) method, they will maybe ignore: the pause method may no longer work, and data will be lost.

      For example, the following simple code:

      while (true) {
          try {
              ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(2));
              if (!records.isEmpty()) {
                  log.error("kafka poll for rebalance discard some record!");
          } catch (Exception e) {
              log.error("maintain poll for rebalance with error:{}", e.getMessage(), e);

      Even if you call pause(assignment) before the poll method every time, the poll method may still return messages.


      2. RootCause:

      In short, during the rebalance of the group, ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark on the partitions previously held by kafkaConsumer. However, while clearing the paused mark of partitions, the corresponding message in the memory (Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in Fetcher#fetchedRecords() still fetching the message and returning it to the customer.

      For more detailed analysis, if you are interested, you can read Jira https://issues.apache.org/jira/browse/KAFKA-13425 

      looking forward to your reply.


      3.Discuss : Can KafkaConsumer support the pause method that is not affected by groupRebalance?

      The KafkaConsumer#pause method actually stated one point at the beginning of its design:

      • Rebalance does not preserve pause/resume state.


      Unfortunately, I did not see this from the comments of the KafkaConsumer#pause(...) method. At the same time, ConsumerCoordinator#invokePartitionsRevoked did not have any log output when cleaning up the paused mark. I believe that this will cause many users to use the KafkaConsumer#pause(...) method incorrectly.

      But I think it is necessary for KafkaConsumer to provide a pause method that is not affected by groupRebalance.


      4. Suggestions

      I will optimize the existing pause method from several different perspectives, or provide some new pause methods, and each point is an independent solution

      1)ConsumerCoordinator#invokePartitionsRevoked should also trigger Fetcher to clean up the revokedAndPausedPartitions message in memory when clearing the paused mark

      This can prevent the Fetcher#fetchedRecords() method from mistakenly thinking that revokedAndPausedPartitions is legal and returning messages. There are various checks on the partition in the fetchedRecords method.

      The price of this is that if the user does not call the pause(...) method before calling the poll method next time, a new FetchMessage request may be initiated, which will cause additional network transmission.


      2)Efforts to maintain the old paused mark on the KafkaConsumer side

      <1>In the ConsumerCoordinator#onJoinPrepare(...) method, record all pausedTopicPartitions from the current assignment of KafkaConsumer;

       <2> In the ConsumerCoordinator#onJoinComplete(...) method, use pausedTopicPartitions to render the latest assignment and restore the paused marks of the partitions that are still in the latest assignment.

      Note: If the new assignment of kafkaConsumer no longer contains topicPartitions that have been paused before rebalance, the paused mark of these topicPartitions will be lost forever on the kafkaConsumer side, even if in a future rebalance, the kafkaConsumer will hold these partitions again.

      At the end of the Jira KAFKA-13425 I mentioned above, I gave a draft code suggestion on this point

      <3> In fact, for consumers who use the RebalanceProtocol.COOPERATIVE protocol

      For example, consumers who use the currently supported PartitionAssignor: CooperativeStickyAssignor, through code analysis, we can find that the default behavior of these consumers is to maintain the old paused flag, and consumers who use the RebalanceProtocol.EAGER protocol default to clear all paused marks.

      I suggest that the KafkaConsumer behavior of the two RebalanceProtocol should be consistent, otherwise it will cause ambiguity to the existing KafkaConsumer#pause(...) and cause great confusion to users.


      3)In the groupRebalance process, pass the paused flag of topicPartitions

      In the JoinGroup request, in addition to reporting the topic that it wants to subscribe to, each consumerMember should also report its pausedTopicPartitions. The JoinGroup response received by the LeaderConsumer should contain all paused partitions under the entire group.

      The latest assignment made by LeaderConsumer should maintain the paused mark and be packaged in LeaderConsumer's SyncGroup request

      In this way, after groupRebalance is completed, even if a paused topicpartition is assigned to a new consumer, the new consumer can continue to maintain the paused mark.

      The KafkaConsumer#paused() method can return the partitions that KafkaConsumer did not call the pause(Collection<TopicPartition> partitions) method.


      4)KafkaConsumer provides a pause method for topic level and supports regular expressions

      KafkaConsumer#pause(Collection<String> topics)

      KafkaConsumer#pause(Pattern pattern)

      Similar to the paused mark in SubscriptionState.assignment, we need to provide a new instance variable ‘TopicState’ in SubscriptionState to store the topic-level paused mark. The ‘TopicState’ data structure can refer to the existing TopicPartitionState.

      <1> ‘TopicState’ should not be affected by groupRebalance, and the paused mark in TopicState will not be changed during the groupRebalance process. TopicState should be the memory mark of a single KafkaConsumer, and it does not have to be passed to other consumers after the rebalance is completed.


      <2> pause(Collection<String> topics), throws IllegalStateException if this consumer is not currently subscribed to any topic provided


      <3> Fetcher's fetchedRecords() and sendFetches() can be combined with TopicState considerations to decide whether to return a message to the user or initiate a Fetch request


      <4> Provide KafkaConsumer#resume(Collection<String> topics) and KafkaConsumer#resume(Pattern pattern) methods to clean up topic-level paused marks.


      5)KafkaConsumer provides a pause method for the consumer level


      The existing pause method is for topicPartition and may sometimes be too fine-grained. And the paused mark is bound in the assignment, it is inevitable that it will not be affected by groupRebalance.

      <1> This method may also be the user's most urgent need. After calling this pause() method, kafkaConsumer will mark itself as a paused state, and the poll method will determine the value of isKafkaConsumerPaused to decide whether to return a message to the user or initiate a Fetch request. This isKafkaConsumerPaused mark should also be held by a single KafkaConsumer itself.


      <2> Users do not need to worry about the poll method returning data after calling the KafkaConsumer#pause() method.

      Users can always call the poll method to avoid the following two results if kafkaConsumer does not call the poll method for a long time

                   (1) The heartbeat thread detection mechanism causes kafkaConsumer to actively leaveGroup;

                   (2) At this time, groupRebalance is triggered. The groupCoordinator will wait for the consumer to initiate a Join Group request. The groupRebalance cannot be completed for a long time (limited by max.poll.interval.ms), causing all consumers under the entire group to suspend consumption.


      <3> Provide KafkaConsumer#resume() at the kafkaConsumer level, to clean up the paused mark of KafkaConsumer


        Issue Links



              Unassigned Unassigned
              RivenSun RivenSun
              0 Vote for this issue
              7 Start watching this issue