Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13180

Data Distribution among partitions not working as Expected

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.8.0
    • None
    • clients
    • None

    Description

      Hi team, we are facing a weird issue. not sure if anyone else faced this same. But we are able to identify the flow.

      Issue
      Using RoundiRobin partitioner with even number of partitions n, resulting in always produce to only n/2 number of partitions

      Is Reproducible: yes

      Scenario: For a Kafka topic, we have 6 partitions (0,1,2,3,4,5). We are trying to produce to a topic with RoundRobin partitioner.

      The RoundRobin partitioner is working based on the index of an ArrayList of partition info. For our case lest assume the order of the partitions is populated as below in the array list.

      {1,2,3,4,5,0}

      Expected flow: Even distribution to 6 partitions

      How it worked: Data was produced only to partition 2,4,0.

      Why:
      On debugging further with the producer flow, we noticed below highlighted method in doSend method of KafkaProducer.

      int partition = partition(record, serializedKey, serializedValue, cluster);
      tp = new TopicPartition(record.topic(), partition);
      .....
      RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
      serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
      if (result.abortForNewBatch) {
      int prevPartition = partition;
      partitioner.onNewBatch(record.topic(), cluster, prevPartition);
      partition = partition(record, serializedKey, serializedValue, cluster);
      tp = new TopicPartition(record.topic(), partition);
      .....
      result = accumulator.append(tp, timestamp, serializedKey,
      serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);

      here, in the accumulator.append, true is passed for abortOnNewBatch. The Deque that is derived in the RecordAccumulator.append method will always be empty on the first message too. Which will try to create a new batch.

      And for the new batch, a new TopicPartition Object is being created, which will have partition 2. And in this flow, the abortOnNewBatch is passed as false, so the record will get added in the DeQueue for this topicpartition.

      How ever this will get distributed properly if the total number of partitions are odd, as the first record is getting addition will only succed when the abordOnNewbatch is passed as false (lets say it as second invoke).

      the order of the invoke will be as follows for an even number of odd number of partitions and even.

      ODD: {1,2,3,4,0}
      Iteration set untill all partitions gets populated:
      1 - 2
      3 - 4
      0 - 1
      2 - 3
      4 - 0

      Dequeue populated partitions = {2,4,1,3,0}

      EVEN: {1,2,3,4,5,0}

      Iteration set untill all partitions gets populated:
      1 - 2
      3 - 4
      5 - 0
      1 - 2
      3 - 4
      5 - 0
      1 - 2
      3 - 4
      5 - 0.........

      Dequeue populated partitions = {2,4,0}

      will go on continuosly as all partitions will never be initated. 

      Attachments

        Activity

          People

            Unassigned Unassigned
            suriyav Suriya Vijayaraghavan
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: