Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
0.8.0
-
None
-
$ uname -a
Linux vrd01.atlnp1 2.6.32-279.el6.x86_64 #1 SMP Fri Jun 22 12:19:21 UTC 2012 x86_64 x86_64 x86_64 GNU/Linux
$ java -version
java version "1.6.0_25"
Java(TM) SE Runtime Environment (build 1.6.0_25-b06)
Java HotSpot(TM) 64-Bit Server VM (build 20.0-b11, mixed mode)
Kafka 0.8.0 loaded from HEAD on 1/29/2013$ uname -a Linux vrd01.atlnp1 2.6.32-279.el6.x86_64 #1 SMP Fri Jun 22 12:19:21 UTC 2012 x86_64 x86_64 x86_64 GNU/Linux $ java -version java version "1.6.0_25" Java(TM) SE Runtime Environment (build 1.6.0_25-b06) Java HotSpot(TM) 64-Bit Server VM (build 20.0-b11, mixed mode) Kafka 0.8.0 loaded from HEAD on 1/29/2013
Description
Based on an email thread in the user group, Neha asked me to submit this.
Original question: "> I ran another test, again starting with a full cluster and all partitions
> had a full set of copies. When I stop the broker which was leader for 9 of
> the 10 partitions, the leaders were all elected on one machine instead of
> the set of 3. Should the leaders have been better spread out? Also the
> copies weren’t fully populated either."
Neha: "For problem 2, we always try to make the preferred replica (1st replica
in the list of all replicas for a partition) the leader, if it is
available. We intended to spread the preferred replica for all partitions
for a topic evenly across the brokers. If this is not happening, we need to
look into it. Please can you file a bug and describe your test case there ?"
Configuration:
4 node cluster
1 topic with 3 replicas
10 partitions: 0-9 below
Current status:
Partition: 0:vrd01.atlnp1 R:[ vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1]
Partition: 1:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 2:vrd01.atlnp1 R:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd02.atlnp1]
Partition: 3:vrd03.atlnp1 R:[ vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 4:vrd01.atlnp1 R:[ vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd02.atlnp1]
Partition: 5:vrd03.atlnp1 R:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 6:vrd01.atlnp1 R:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1]
Partition: 7:vrd01.atlnp1 R:[ vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 8:vrd03.atlnp1 R:[ vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 9:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd03.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1]
Shutdown vrd03:
Partition: 0:vrd01.atlnp1 R:[ ] I:[]
Partition: 1:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 2:vrd01.atlnp1 R:[ ] I:[]
*Partition: 3:vrd04.atlnp1 R:[ ] I:[]
Partition: 4:vrd01.atlnp1 R:[ ] I:[]
*Partition: 5:vrd04.atlnp1 R:[ ] I:[]
Partition: 6:vrd01.atlnp1 R:[ ] I:[]
Partition: 7:vrd01.atlnp1 R:[ vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
*Partition: 8:vrd04.atlnp1 R:[ ] I:[]
Partition: 9:vrd01.atlnp1 R:[ ] I:[]
(* means leader changed)
Note that partitions 3, 5 and 8 were assigned new leaders.
Per an email group thread with Neha, the new leader should be assigned from the preferred replica. So 3 should have gotten vrd02, 5, vrd04 and 8 vrd02 (since 03 was shutdown). Instead 3 got vrd04, 5 got vrd04 and 8 got vrd04.
Restarting vrd03 led to:
Partition: 0:vrd01.atlnp1 R:[ vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd03.atlnp1]
Partition: 1:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 2:vrd01.atlnp1 R:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
Partition: 3:vrd04.atlnp1 R:[ vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
Partition: 4:vrd01.atlnp1 R:[ vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
Partition: 5:vrd04.atlnp1 R:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
Partition: 6:vrd01.atlnp1 R:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd03.atlnp1]
Partition: 7:vrd01.atlnp1 R:[ vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
Partition: 8:vrd04.atlnp1 R:[ vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
Partition: 9:vrd01.atlnp1 R:[ vrd04.atlnp1 vrd03.atlnp1 vrd01.atlnp1] I:[ vrd01.atlnp1 vrd04.atlnp1 vrd03.atlnp1]
Stopping vrd01 now led to:
*Partition: 0:vrd04.atlnp1 R:[ ] I:[]
*Partition: 1:vrd04.atlnp1 R:[ ] I:[]
*Partition: 2:vrd02.atlnp1 R:[ ] I:[]
Partition: 3:vrd04.atlnp1 R:[ vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
*Partition: 4:vrd02.atlnp1 R:[ ] I:[]
Partition: 5:vrd04.atlnp1 R:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
*Partition: 6:vrd04.atlnp1 R:[ ] I:[]
*Partition: 7:vrd04.atlnp1 R:[ ] I:[]
Partition: 8:vrd04.atlnp1 R:[ vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1]
*Partition: 9:vrd04.atlnp1 R:[ ] I:[]
(* means leader changed)
So 0, 2, 4, 6 and 7 were assigned the wrong leader (If preferred was first in the list. If last in list 1 & 2 are wrong)
Java code:
kafka.javaapi.consumer.SimpleConsumer consumer = new SimpleConsumer("vrd04.atlnp1",
9092,
100000,
64 * 1024, "test");
List<String> topics2 = new ArrayList<String>();
topics2.add("storm-anon");
TopicMetadataRequest req = new TopicMetadataRequest(topics2);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<kafka.javaapi.TopicMetadata> data3 = resp.topicsMetadata();
for (kafka.javaapi.TopicMetadata item : data3) {
for (kafka.javaapi.PartitionMetadata part: item.partitionsMetadata() ) {
String replicas = "";
String isr = "";
for (kafka.cluster.Broker replica: part.replicas() )
{ replicas += " " + replica.host(); }for (kafka.cluster.Broker replica: part.isr() )
{ isr += " " + replica.host(); } System.out.println( "Partition: " + part.partitionId() + ":" + part.leader().host() + " R:[ " + replicas + "] I:[" + isr + "]");
}
}