Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13425

KafkaConsumer#pause() will lose its effect after groupRebalance occurs, which maybe cause data loss on the consumer side

    XMLWordPrintableJSON

Details

    Description

       
       

      Foreword:

      Since I want to achieve the decoupling of the two processes of polling messages and consuming messages on the KafkaConsumer side, I use the "poll --> push" architecture model on the Kafka consumer side.

      .

      Architecture

       see picture "architecture_picture"

      1)ThreadPoolExecutor

      The key parameters of ThreadPoolExecutor threadPool are:

      (1) Select ArrayBlockingQueue<Runnable> for workQueue type

      (2) The handler uses the RejectedExecutionHandler interface

      (3)threadPool.allowCoreThreadTimeOut(true);

       

      2) KafkaConsumer

      The disadvantage of this architecture is that if the business side’s onMessage() method is time-consuming to execute, it will lead to:

      (1)The blockingQueue of ThreadPoolExecutor will accumulate a large number of Tasks, and eventually the push message will fail.

      (2)How to deal with the KafkaConsumer poll() method when the push fails:

      1. stop call poll()

      KafkaConsumer needs to set configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE); In order to prevent the heartbeat thread of KafkaConumser from discovering that KafkaConsumer does not call the poll() method for a long time, and automatically execute maybeLeaveGroup("consumer poll timeout has expired.");

      But the most serious consequence of this is that once the rebalance of the group is triggered for some reason, the rebalance process of the entire group will not be completed because the kafkaConsumer does not call the poll() method. This will cause all consumers under the entire group to stop consumption.

      2. When the push message fails, continue to maintain the poll method

      The purpose is to maintain the poll method call of kafkaConsumer, but at this time KafkaConsumer should not poll any messages, because the downstream BlockingQueue for storing messages is full. So at this time we need the help of KafkaConsumer#pause(...) and KafkaConsumer#resume(...). And I named this special poll method maintainPoll4Rebalance().

       

      maintainPoll4Rebalance Preliminary design ideas:

      code Simple design:

          public static void main(String[] args) {
              while (true) {
                  try {
                      List<Object> messages = kafkaConsumer.poll(Duration.ofSeconds(1));
      
                      while (!publish(message)) {
                          try {
                              maintainPoll4Rebalance();
                          } catch (Exception e) {
                              log.error("maintain poll for rebalance with error {}", e.getMessage(), e);
                              CommonUtil.sleep(TimeUnit.SECONDS, 1);
                          }
                      }
      
                  } catch (Exception e) {
                      log.error("KafkaConsumer poll message has error: {}", e.getMessage(), e);
                      CommonUtil.sleep(TimeUnit.MILLISECONDS, ClientConfig.CLIENT_SLEEP_INTERVAL_MS);
                  }
              }
          }
      
          private boolean publish(Object message) {
      
              try {
      
                  ...
      
                  threadPool.execute(() -> onMessage(message));
      
              } catch (RejectedExecutionException e) {
                  log.error("consumer execute failed with error{}", e.getMessage(), e);
                  return false;
              } catch (Exception e) {
                  log.error("consumer execute failed with error{}", e.getMessage(), e);
                  return false;
              }
              return true;
          }
      
          private void maintainPoll4Rebalance() {
              try {
                  kafkaConsumer.pause(kafkaConsumer.assignment());
                  ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofSeconds(1));
                  if (!records.isEmpty()) {
                      log.error("kafka poll for rebalance discard some record!");
                      for (ConsumerRecord<String, Object> consumerRecord : records) {
                          if (consumerRecord != null) {
                              log.error("this record need to retry, partition {} ,offset {}", consumerRecord.partition(), consumerRecord.offset());
                          }
                      }
                  }
              } catch (Exception e) {
                  log.error("maintain poll for rebalance with error:{}", e.getMessage(), e);
              } finally {
                  kafkaConsumer.resume(kafkaConsumer.assignment());
              }
          }
      

       

       

      The above code maintainPoll4Rebalance() seems to be a good solution to my problem. When downstream consumption is blocked, KafkaConsumer can maintain the continuous call of the poll method, and it avoids that KafkaConsumer can continue to pull messages when the push fails.

      But in reality, logs will appear during operation:

      [main] ERROR ConsumerTest3 - kafka poll for rebalance discard some record!
      [main] ERROR ConsumerTest3 - this record need to retry, partition 0 ,offset 36901
      

      I obviously have called kafkaConsumer.pause(kafkaConsumer.assignment()) before kafkaConsumer#poll is called. Why does kafkaConsumer still pull the message and cause the message to be lost? The reason for the loss is that the consumer turned on the auto-commit offset.

       

      RootCause Analysis

       

      KafkaConsumer#poll:
      1) updateAssignmentMetadataIfNeeded
      2) fetcher.fetchedRecords()
      3) fetcher.sendFetches();

      These three methods are the three most critical operations in KafkaConsumer#poll. updateAssignmentMetadataIfNeeded is mainly responsible for group rebalance related work. And RC appears in the first and second steps.

       

      1.updateAssignmentMetadataIfNeeded

       

      We trace directly to ConsumerCoordinator#onJoinPrepare(...)

      else {
                  switch (protocol) {
                      case EAGER:
                          // revoke all partitions
                          revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
                          exception = invokePartitionsRevoked(revokedPartitions);
      
                          subscriptions.assignFromSubscribed(Collections.emptySet());
      
                          break;
      
                      case COOPERATIVE:
                          // only revoke those partitions that are not in the subscription any more.
                          Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
                          revokedPartitions = ownedPartitions.stream()
                              .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
                              .collect(Collectors.toSet());
      
                          if (!revokedPartitions.isEmpty()) {
                              exception = invokePartitionsRevoked(revokedPartitions);
      
                              ownedPartitions.removeAll(revokedPartitions);
                              subscriptions.assignFromSubscribed(ownedPartitions);
                          }
      
                          break;
                  }
              }
      

       

      The value of the protocol instance variable here, see its initialization code

              // select the rebalance protocol such that:
              //   1. only consider protocols that are supported by all the assignors. If there is no common protocols supported
              //      across all the assignors, throw an exception.
              //   2. if there are multiple protocols that are commonly supported, select the one with the highest id (i.e. the
              //      id number indicates how advanced the protocol is).
              // we know there are at least one assignor in the list, no need to double check for NPE
              if (!assignors.isEmpty()) {
                  List<RebalanceProtocol> supportedProtocols = new ArrayList<>(assignors.get(0).supportedProtocols());
      
                  for (ConsumerPartitionAssignor assignor : assignors) {
                      supportedProtocols.retainAll(assignor.supportedProtocols());
                  }
      
                  if (supportedProtocols.isEmpty()) {
                      throw new IllegalArgumentException("Specified assignors " +
                          assignors.stream().map(ConsumerPartitionAssignor::name).collect(Collectors.toSet()) +
                          " do not have commonly supported rebalance protocol");
                  }
      
                  Collections.sort(supportedProtocols);
      
                  protocol = supportedProtocols.get(supportedProtocols.size() - 1);
              } else {
                  protocol = null;
              }
      

       

      After a simple analysis, we can understand that as long as supportedProtocols contains the RebalanceProtocol.COOPERATIVE element, the protocol value will be COOPERATIVE, otherwise it will be EAGER.

      But to check the ConsumerPartitionAssignor interface, I found that all its implementation classes except CooperativeStickyAssignor, all other PartitionAssignor implementation classes have adopted default values

      Indicate which rebalance protocol this assignor works with; By default it should always work with ConsumerPartitionAssignor.RebalanceProtocol.EAGER.
      default List<RebalanceProtocol> supportedProtocols() {
              return Collections.singletonList(RebalanceProtocol.EAGER);
          }
      

       

      So the code will run to

                      case EAGER:
                          // revoke all partitions
                          revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
                          exception = invokePartitionsRevoked(revokedPartitions);
      
                          subscriptions.assignFromSubscribed(Collections.emptySet());
      
                          break;
      

       

       

      The problem is here, subscriptions.assignFromSubscribed(Collections.emptySet()) will clear the assignment in my subscriptions, and then clear the paused mark for TopicPartition.

      2.fetcher.fetchedRecords()

      There is no need to go into the code here, fetchedRecords will verify the corresponding TopicPartition of each message set CompletedFetch in memory

      1)if (subscriptions.isPaused(nextInLineFetch.partition))

      2)if (!subscriptions.isAssigned(completedFetch.partition))

      3)if (!subscriptions.isFetchable(completedFetch.partition))

       

      The problem is: If within the pollTimer specified by the user, a poll(...) call completes the updateAssignmentMetadataIfNeeded operation, the updateAssignmentMetadataIfNeeded method returns true, and the paused flag for TopicPartition has also been cleared in updateAssignmentMetadataIfNeeded, and the new assignments of kafkaConsumer still hold this TopicPartition after the rebalance is completed. Then the verification of TopicPartition mentioned above will pass.

      And the nextInLineFetch variable in KafkaConsumer memory stores TopicPartition messages, the KafkaConsumer#poll(...) method will still return the message after calling pause(...). Even if you always call pause(...) before each poll(...), it will Return the message corresponding to TopicPartition.

      If the business side cannot process the message at this time, and the KafkaConsumer turns on the automatic submission offset switch, the message will be lost on the consumer side. The maximum number of lost messages max.poll.records.

              try {
                  kafkaConsumer.pause(kafkaConsumer.assignment());
                  ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
                  
              } catch (Exception e) {
                  log.error("maintain poll for rebalance with error:{}", e.getMessage(), e);
              } finally {
                  kafkaConsumer.resume(kafkaConsumer.assignment());
              }
      

       

       

       

       

      maintainPoll4Rebalance() Temporary solution

       

      The paused mark of TopicPartition is remedied in ConsumerRebalanceListener#onPartitionsAssigned(...)

          private boolean maintainPoll4Rebalance;
         
          private void initKafkaConsumer() {
      
              kafkaConsumer.subscribe(topics, () -> new ConsumerRebalanceListener() {
      
                  @Override
                  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                      confirmMessageSync();
                      log.info("consumer on  partition revoked!");
                  }
      
                  @Override
                  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
      
                      try {
                          if (maintainPoll4Rebalance) {
                              kafkaConsumer.pause(kafkaConsumer.assignment());
                          }
                      } catch (Exception e) {
                          log.error("consumer onPartitionsAssigned failed with error:{}!", e.getMessage(), e);
                      }
                      log.info("consumer on  partition assigned!");
                  }
              });
          }
          
          
          private void maintainPoll4Rebalance() {
              try {
                  maintainPoll4Rebalance = true;
                  kafkaConsumer.pause(kafkaConsumer.assignment());
                  ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofSeconds(1));
                  if (!records.isEmpty()) {
                      log.error("kafka poll for rebalance discard some record!");
                      for (ConsumerRecord<String, Object> consumerRecord : records) {
                          if (consumerRecord != null) {
                              log.error("this record need to retry, partition {} ,offset {}", consumerRecord.partition(), consumerRecord.offset());
                          }
                      }
                  }
              } catch (Exception e) {
                  log.error("maintain poll for rebalance with error:{}", e.getMessage(), e);
              } finally {
                  maintainPoll4Rebalance = false;
                  kafkaConsumer.resume(kafkaConsumer.assignment());
              }
          }
      

       

      After testing, this problem can be temporarily solved. After calling kafkaConsumer#pause(...), kafkaConsumer.poll(...) will definitely not return the corresponding TopicPartition message.

      Suggestions

      1. Precise semantics of kafkaConsumer#pause(…)

      First look at the comments on this method

      Suspend fetching from the requested partitions. Future calls to poll(Duration) will not return any records from these partitions until they have been resumed using resume(Collection). Note that this method does not affect partition subscription. In particular, it does not cause a group rebalance when automatic assignment is used.
      Params:
      partitions – The partitions which should be paused
      Throws:
      IllegalStateException – if any of the provided partitions are not currently assigned to this consumer
      @Override
          public void pause(Collection<TopicPartition> partitions) {
              acquireAndEnsureOpen();
              try {
                  log.debug("Pausing partitions {}", partitions);
                  for (TopicPartition partition: partitions) {
                      subscriptions.pause(partition);
                  }
              } finally {
                  release();
              }
          }
      

       

      We don’t know from the comments that the pause method will lose its function after a groupRebalance.

      And When cleaning the paused mark of topicPartitions, kafkaConsumer did not output any logs, and the customer could not perceive that the pause(...) method no longer works.

       

      2. When we execute invokePartitionsRevoked(revokedPartitions), do we consider the need to clean up the messages in KafkaConsumer memory corresponding to revokedPartitions?

      If cleaned up, the cost is: After resume(...) , kafkaConsumer needs to re-initiate FetchRequests for resumedPartitions, which brings additional network transmission

       

      3.We better support the pause(...) method on the KafkaConsumer side that is not affected by groupRebalance

      1) When rebalance starts to prepare, add new logic to ConsumerCoordinator#onJoinPrepare(...)

      Before executing invokePartitionsRevoked(...) and subscriptions.assignFromSubscribed(...), filter out customerPausedPartitions from the subscriptions.assignment of the current KafkaConsumer, and customerPausedPartitions should be instance variables of ConsumerCoordinator.

      customerPausedPartitions = subscriptions.pausedPartitions();
      //Add new code in front of the following two codes
      
      exception = invokePartitionsRevoked(...);
      subscriptions.assignFromSubscribed(...);
      

       

      2) After the rebalance is completed, add new logic to ConsumerCoordinator#onJoinComplete(...)

          protected void onJoinComplete(int generation,
                                        String memberId,
                                        String assignmentStrategy,
                                        ByteBuffer assignmentBuffer) {
              log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId);
      
              ......
      
              subscriptions.assignFromSubscribed(assignedPartitions);
              
              //Add new code here
              if (customerPausedPartitions != null && customerPausedPartitions.size() != 0){
                  customerPausedPartitions.forEach(topicPartition -> {
                      if(subscriptions.isAssigned(topicPartition))
                          subscriptions.pause(topicPartition);
                  });
                  customerPausedPartitions = null;
              }
      
              // Add partitions that were not previously owned but are now assigned
              firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));
      
              ......
          }
      

      The above is just a first draft of the modified code. It can only guarantee that after a rebalance, the topicPartitions still held in the new assignment of KafkaConsumer will maintain the paused mark.

      Note: If the new assignment of kafkaConsumer no longer contains topicPartitions that have been paused before rebalance, the paused mark of these topicPartitions will be lost forever on the kafkaConsumer side, even if in a future rebalance, the kafkaConsumer will hold these partitions again.

       

       

       

      Attachments

        1. architecture_picture.png
          58 kB
          RivenSun

        Issue Links

          Activity

            People

              Unassigned Unassigned
              RivenSun RivenSun
              Luke Chen Luke Chen
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: