Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-783

Preferred replica assignment on leader failure may not be correct

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 0.8.0
    • None
    • replication

    Description

      Based on an email thread in the user group, Neha asked me to submit this.

      Original question: "> I ran another test, again starting with a full cluster and all partitions
      > had a full set of copies. When I stop the broker which was leader for 9 of
      > the 10 partitions, the leaders were all elected on one machine instead of
      > the set of 3. Should the leaders have been better spread out? Also the
      > copies weren’t fully populated either."

      Neha: "For problem 2, we always try to make the preferred replica (1st replica
      in the list of all replicas for a partition) the leader, if it is
      available. We intended to spread the preferred replica for all partitions
      for a topic evenly across the brokers. If this is not happening, we need to
      look into it. Please can you file a bug and describe your test case there ?"

      Configuration:
      4 node cluster
      1 topic with 3 replicas
      10 partitions: 0-9 below

      Current status:

      Partition: 0:vrd01.atlnp1 R:[ vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1]
      Partition: 1:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
      Partition: 2:vrd01.atlnp1 R:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd02.atlnp1]
      Partition: 3:vrd03.atlnp1 R:[ vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
      Partition: 4:vrd01.atlnp1 R:[ vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd02.atlnp1]
      Partition: 5:vrd03.atlnp1 R:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
      Partition: 6:vrd01.atlnp1 R:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1]
      Partition: 7:vrd01.atlnp1 R:[ vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
      Partition: 8:vrd03.atlnp1 R:[ vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
      Partition: 9:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd03.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1]

      Shutdown vrd03:

      Partition: 0:vrd01.atlnp1 R:[ ] I:[]
      Partition: 1:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
      Partition: 2:vrd01.atlnp1 R:[ ] I:[]
      *Partition: 3:vrd04.atlnp1 R:[ ] I:[]
      Partition: 4:vrd01.atlnp1 R:[ ] I:[]
      *Partition: 5:vrd04.atlnp1 R:[ ] I:[]
      Partition: 6:vrd01.atlnp1 R:[ ] I:[]
      Partition: 7:vrd01.atlnp1 R:[ vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
      *Partition: 8:vrd04.atlnp1 R:[ ] I:[]
      Partition: 9:vrd01.atlnp1 R:[ ] I:[]
      (* means leader changed)

      Note that partitions 3, 5 and 8 were assigned new leaders.

      Per an email group thread with Neha, the new leader should be assigned from the preferred replica. So 3 should have gotten vrd02, 5, vrd04 and 8 vrd02 (since 03 was shutdown). Instead 3 got vrd04, 5 got vrd04 and 8 got vrd04.

      Restarting vrd03 led to:

      Partition: 0:vrd01.atlnp1 R:[ vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd03.atlnp1]
      Partition: 1:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
      Partition: 2:vrd01.atlnp1 R:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
      Partition: 3:vrd04.atlnp1 R:[ vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
      Partition: 4:vrd01.atlnp1 R:[ vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
      Partition: 5:vrd04.atlnp1 R:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
      Partition: 6:vrd01.atlnp1 R:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd03.atlnp1]
      Partition: 7:vrd01.atlnp1 R:[ vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
      Partition: 8:vrd04.atlnp1 R:[ vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
      Partition: 9:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd03.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd03.atlnp1]

      Stopping vrd01 now led to:

      *Partition: 0:vrd04.atlnp1 R:[ ] I:[]
      *Partition: 1:vrd04.atlnp1 R:[ ] I:[]
      *Partition: 2:vrd02.atlnp1 R:[ ] I:[]
      Partition: 3:vrd04.atlnp1 R:[ vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
      *Partition: 4:vrd02.atlnp1 R:[ ] I:[]
      Partition: 5:vrd04.atlnp1 R:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
      *Partition: 6:vrd04.atlnp1 R:[ ] I:[]
      *Partition: 7:vrd04.atlnp1 R:[ ] I:[]
      Partition: 8:vrd04.atlnp1 R:[ vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
      *Partition: 9:vrd04.atlnp1 R:[ ] I:[]

      (* means leader changed)

      So 0, 2, 4, 6 and 7 were assigned the wrong leader (If preferred was first in the list. If last in list 1 & 2 are wrong)

      Java code:

      kafka.javaapi.consumer.SimpleConsumer consumer = new SimpleConsumer("vrd04.atlnp1",
      9092,
      100000,
      64 * 1024, "test");

      List<String> topics2 = new ArrayList<String>();
      topics2.add("storm-anon");
      TopicMetadataRequest req = new TopicMetadataRequest(topics2);
      kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

      List<kafka.javaapi.TopicMetadata> data3 = resp.topicsMetadata();

      for (kafka.javaapi.TopicMetadata item : data3) {

      for (kafka.javaapi.PartitionMetadata part: item.partitionsMetadata() ) {
      String replicas = "";
      String isr = "";

      for (kafka.cluster.Broker replica: part.replicas() )

      { replicas += " " + replica.host(); }

      for (kafka.cluster.Broker replica: part.isr() )

      { isr += " " + replica.host(); }

      System.out.println( "Partition: " + part.partitionId() + ":" + part.leader().host() + " R:[ " + replicas + "] I:[" + isr + "]");
      }
      }

      Attachments

        Activity

          People

            sriramsub Sriram
            chriscurtin Chris Curtin
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: