Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4753

KafkaConsumer susceptible to FetchResponse starvation

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • None
    • None

    Description

      FetchResponse starvation here means that the KafkaConsumer repeatedly fails to fully form FetchResponses within the request timeout from a subset of the brokers its fetching from while FetchResponses from the other brokers can get fully formed and processed by the application.

      In other words, this ticket is concerned with scenarios where fetching from some brokers hurts the progress of fetching from other brokers to the point of repeatedly hitting a request timeout.

      Some FetchResponse starvation scenarios:
      1. partition leadership of the consumer's assigned partitions is skewed across brokers, causing uneven FetchResponse sizes across brokers.
      2. the consumer seeks back on partitions on some brokers but not others, causing uneven FetchResponse sizes across brokers.
      3. the consumer's ability to keep up with various partitions across brokers is skewed, causing uneven FetchResponse sizes across brokers.

      I've personally seen scenario 1 happen this past week to one of our users in prod. They manually assigned partitions such that a few brokers led most of the partitions while other brokers only led a single partition. When NetworkClient sends out FetchRequests to different brokers in parallel with an uneven partition distribution, FetchResponses from brokers who lead more partitions will contain more data than FetchResponses from brokers who lead few partitions. This means the small FetchResponses will get fully formed quicker than larger FetchResponses. When the application eventually consumes a smaller fully formed FetchResponses, the NetworkClient will send out a new FetchRequest to the lightly-loaded broker. Their response will again come back quickly while only marginal progress has been made on the larger FetchResponse. Repeat this process several times and your application will have potentially processed many smaller FetchResponses while the larger FetchResponse made minimal progress and is forced to timeout, causing the large FetchResponse to start all over again, which causes starvation.

      To mitigate the problem for the short term, I've suggested to our user that they either:
      1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB to something like 1 MB. This is the solution I short-term solution I suggested they go with.
      2. reduce the "max.partition.fetch.bytes" down from the current default of 1 MB to something like 100 KB. This solution wasn't advised as it could impact broker performance.
      3. ask our SRE's to run a partition reassignment to balance out the partition leadership (partitions were already being led by their preferred leaders).
      4. bump up their request timeout. It was set to open-source's former default of 40 seconds.

      Contributing factors:
      1. uneven FetchResponse sizes across brokers.
      2. processing time of the polled ConsumerRecords.
      3. "max.poll.records" increases the number of polls needed to consume a FetchResponse, making constant-time overhead per poll magnified.
      4. "max.poll.records" makes KafkaConsumer.poll bypass calls to ConsumerNetworkClient.poll.
      5. java.nio.channels.Selector.select, Selector.poll, NetworkClient.poll, and ConsumerNetworkClient.poll can return before the poll timeout as soon as a single channel is selected.
      6. NetworkClient.poll is solely driven by the user thread with manual partition assignment.

      So far I've only locally reproduced starvation scenario 1 and haven't even attempted the other scenarios. Preventing the bypass of ConsumerNetworkClient.poll (contributing factor 3) mitigates the issue, but it seems starvation would still be possible.

      How to reproduce starvation scenario 1:
      1. startup zookeeper
      2. startup two brokers
      3. create a topic t0 with two partitions led by broker 0 and create a topic t1 with a single partition led by broker 1

      > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 --replica-assignment 0,0
      > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 --replica-assignment 1
      

      4. Produce a lot of data into these topics

      > ./bin/kafka-producer-perf-test.sh --topic t0 --num-records 20000000 --record-size 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9090,localhost:9091
      > ./bin/kafka-producer-perf-test.sh --topic t1 --num-records 10000000 --record-size 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9090,localhost:9091
      

      5. startup a consumer that consumes these 3 partitions with TRACE level NetworkClient logging

      > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.StarvedFetchResponseTest 10000 3000 65536
      

      The config/tools-log4j.properties:

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #    http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      
      log4j.rootLogger=WARN, stderr
      
      log4j.appender.stderr=org.apache.log4j.ConsoleAppender
      log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
      log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
      log4j.appender.stderr.Target=System.err
      
      log4j.logger.org.apache.kafka.clients.NetworkClient=TRACE, stderr
      log4j.additivity.org.apache.kafka.clients.NetworkClient=false
      

      The consumer code:

      /**
       * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
       * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
       * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
       * License. You may obtain a copy of the License at
       *
       * http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
       * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
       * specific language governing permissions and limitations under the License.
       */
      package org.apache.kafka.clients.consumer;
      
      import org.apache.kafka.common.TopicPartition;
      
      import java.util.ArrayList;
      import java.util.HashMap;
      import java.util.List;
      import java.util.Map;
      import java.util.Properties;
      import java.util.Set;
      
      public class StarvedFetchResponseTest {
          public static void main(String[] args) throws InterruptedException {
              long pollTimeout = Long.valueOf(args[0]);
              long sleepDuration = Long.valueOf(args[1]);
              String receiveBufferSize = args[2];
              Properties props = new Properties();
              props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9090,localhost:9091");
              props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "fetch-response-starvation");
              props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
              props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
              props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
              props.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
              props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
              props.setProperty(ConsumerConfig.RECEIVE_BUFFER_CONFIG, receiveBufferSize);
              KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(props);
              List<TopicPartition> partitions = new ArrayList<>();
              for (int i = 0; i < 2; i++) {
                  partitions.add(new TopicPartition("t0", i));
              }
              partitions.add(new TopicPartition("t1", 0));
              kafkaConsumer.assign(partitions);
              kafkaConsumer.seekToBeginning(partitions);
              while (true) {
                  ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(pollTimeout);
                  System.out.println(recordsPerTopic(records));
                  Thread.sleep(sleepDuration);
              }
          }
      
          private static Map<TopicPartition, Integer> recordsPerTopic(ConsumerRecords<byte[], byte[]> records) {
              Map<TopicPartition, Integer> result = new HashMap<>();
              Set<TopicPartition> partitions = records.partitions();
              for (TopicPartition partition : partitions) {
                  if (!result.containsKey(partition)) {
                      result.put(partition, 0);
                  }
                  result.put(partition, result.get(partition) + records.records(partition).size());
              }
              return result;
          }
      }
      

      After running it for 30 minutes, around 33 FetchResponses from broker 1 were served to the application while the many partially formed FetchResponses from broker 0 were cancelled due to a disconnect from request timeout. It seems that were was only one successful FetchResponse from broker 0 served to the application during this time.

      Attachments

        Issue Links

          Activity

            People

              onurkaraman Onur Karaman
              onurkaraman Onur Karaman
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated: