Kafka
  1. Kafka
  2. KAFKA-44

Various ZK listeners to support intra-cluster replication

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      We need to implement the new ZK listeners for the new paths registered in ZK.

        Issue Links

          Activity

          Hide
          Neha Narkhede added a comment -

          Fixed as part of KAFKA-301

          Show
          Neha Narkhede added a comment - Fixed as part of KAFKA-301
          Hide
          Neha Narkhede added a comment -

          This is very closely related to the broker startup procedure

          Show
          Neha Narkhede added a comment - This is very closely related to the broker startup procedure
          Hide
          Prashanth Menon added a comment -

          Sounds good to me. I'm also fine with removing this ticket and rolling this work as another subtask of KAFKA-45 just for clarity. Your decision

          Show
          Prashanth Menon added a comment - Sounds good to me. I'm also fine with removing this ticket and rolling this work as another subtask of KAFKA-45 just for clarity. Your decision
          Hide
          Neha Narkhede added a comment - - edited

          This JIRA can implement the stateChangeListener() as described in the Kafka replication design document, and leave stubs for becomeFollower()/becomeLeader() which are part of KAFKA-302. Also, lets leave out anything about partition reassignment for now. That work is included as part of other JIRAs and can be done when the basic replication functionality is nailed and tested.

          Prashanth, I've attempted to reduce dependencies and define scope of KAFKA-44 and KAFKA-45. Hopefully the above clarifies the scope of this JIRA. Assigning it to you, as per your request

          Show
          Neha Narkhede added a comment - - edited This JIRA can implement the stateChangeListener() as described in the Kafka replication design document, and leave stubs for becomeFollower()/becomeLeader() which are part of KAFKA-302 . Also, lets leave out anything about partition reassignment for now. That work is included as part of other JIRAs and can be done when the basic replication functionality is nailed and tested. Prashanth, I've attempted to reduce dependencies and define scope of KAFKA-44 and KAFKA-45 . Hopefully the above clarifies the scope of this JIRA. Assigning it to you, as per your request
          Hide
          Prashanth Menon added a comment -

          Also, mind if I assign this to myself?

          Show
          Prashanth Menon added a comment - Also, mind if I assign this to myself?
          Hide
          Prashanth Menon added a comment -

          That sounds fair enough. Can we create a wiki page attached to one created for the overall replication work as mentioned in KAFKA-50?

          Show
          Prashanth Menon added a comment - That sounds fair enough. Can we create a wiki page attached to one created for the overall replication work as mentioned in KAFKA-50 ?
          Hide
          Neha Narkhede added a comment -

          This approach of having a queue of state change requests that each replica acts upon, is something I'm leaning towards for all state changes.

          There are 2 ways of making state changes in a system which uses ZK listeners -

          1. Each server listens on various ZK paths, registers the same listeners, and follows the same code path to apply state changes to itself. Here, the state machine, is replicated on each server.
          2. A highly-available co-ordinator listens of various ZK paths, registers ZK listeners, verifies system state and state transitions. Then issues state transition requests to the various replicas. Here, only the co-ordinator executes the state machine.

          We have been down approach 1 earlier with the zookeeper consumer, and through experience, found that though, it seems simpler to design and implement at first, it turns into a fairly buggy and high operational overhead system. This is because that approach suffers from

          1. herd effect
          2. "split brain" problem.
          3. In addition to these, it will be pretty complicated to perform upgrades on the state machine and can leave the cluster in an undefined state during upgrades.
          4. Monitoring the state machine is a hassle, due to it being distributed in nature

          Approach 2 ensures the state machine only on the co-ordinator, which itself is elected from amongst the brokers. This approach ensures that -

          1. at any point of time, we can reason about the state of the entire cluster.
          2. Only after the state is verified, can further state changes be applied. If verification fails, alerts can be triggered preventing the system from getting into an indefinite state.
          3. A big advantage of this approach is easier upgrades to the state machine. It is true that, theoretically, state machine logic doesn't change much over time, but in reality, state machine changes would need upgrades, due to improvements in the logic or fixing code bugs.
          4. Monitoring the state machine becomes much simpler

          In general, both approaches are “doable”, but we need to weigh the cost of “patching” the code to make it work VS choosing a simple design that will be easy to maintain and monitor.

          I would like to see a discussion on this fundamental design choice, before jumping to code and patches on KAFKA-44 and KAFKA-45.

          Show
          Neha Narkhede added a comment - This approach of having a queue of state change requests that each replica acts upon, is something I'm leaning towards for all state changes. There are 2 ways of making state changes in a system which uses ZK listeners - 1. Each server listens on various ZK paths, registers the same listeners, and follows the same code path to apply state changes to itself. Here, the state machine, is replicated on each server. 2. A highly-available co-ordinator listens of various ZK paths, registers ZK listeners, verifies system state and state transitions. Then issues state transition requests to the various replicas. Here, only the co-ordinator executes the state machine. We have been down approach 1 earlier with the zookeeper consumer, and through experience, found that though, it seems simpler to design and implement at first, it turns into a fairly buggy and high operational overhead system. This is because that approach suffers from 1. herd effect 2. "split brain" problem. 3. In addition to these, it will be pretty complicated to perform upgrades on the state machine and can leave the cluster in an undefined state during upgrades. 4. Monitoring the state machine is a hassle, due to it being distributed in nature Approach 2 ensures the state machine only on the co-ordinator, which itself is elected from amongst the brokers. This approach ensures that - 1. at any point of time, we can reason about the state of the entire cluster. 2. Only after the state is verified, can further state changes be applied. If verification fails, alerts can be triggered preventing the system from getting into an indefinite state. 3. A big advantage of this approach is easier upgrades to the state machine. It is true that, theoretically, state machine logic doesn't change much over time, but in reality, state machine changes would need upgrades, due to improvements in the logic or fixing code bugs. 4. Monitoring the state machine becomes much simpler In general, both approaches are “doable”, but we need to weigh the cost of “patching” the code to make it work VS choosing a simple design that will be easy to maintain and monitor. I would like to see a discussion on this fundamental design choice, before jumping to code and patches on KAFKA-44 and KAFKA-45 .
          Hide
          Prashanth Menon added a comment -

          Okay, I've attached a very very rough draft of a patch. I'm really looking for feedback and thoughts because there's just a lot of overlap with KAFKA-45.

          1. New classes are Replica, Partition and ReplicaManager. kafka.replica.Partition will need to be merged with kafka.cluster.Partition which is a little light at the moment.
          2. There are a few placeholders for KAFKA-45 and KAFKA-46 in there.
          3. Does not include the partition reassignment listener. We'll need to revisit this because the current process suffers from the same problem as the previous partition assignment logic.
          4. Includes the new assigned_partitions path but relies on old replica assignment path creation for testing purposes. This will need to change once the tools change.
          5. I've tried to make it as unintrusive as possible.

          There is one issue I'm trying to wrap my head around. Consider a broker A that comes up with no partitions and the admin reassigns a partition X to it. It properly bootstraps it and catches up. Upon catch up, the current leader executes a leader election and assume broker A wins. Broker A then does some bootstrapping before "switching on" the partition and serving fetch/produce requests. Part of the bootstrap is determining which replicas are in ISR (read from ZK) by waiting for the replica to catch up, but because the server isn't responding to fetch requests for the replica and it isn't aware of where every replica is in terms of its HW and LEO, none will ever catch up "in time". Am I missing something?

          Show
          Prashanth Menon added a comment - Okay, I've attached a very very rough draft of a patch. I'm really looking for feedback and thoughts because there's just a lot of overlap with KAFKA-45 . 1. New classes are Replica, Partition and ReplicaManager. kafka.replica.Partition will need to be merged with kafka.cluster.Partition which is a little light at the moment. 2. There are a few placeholders for KAFKA-45 and KAFKA-46 in there. 3. Does not include the partition reassignment listener. We'll need to revisit this because the current process suffers from the same problem as the previous partition assignment logic. 4. Includes the new assigned_partitions path but relies on old replica assignment path creation for testing purposes. This will need to change once the tools change. 5. I've tried to make it as unintrusive as possible. There is one issue I'm trying to wrap my head around. Consider a broker A that comes up with no partitions and the admin reassigns a partition X to it. It properly bootstraps it and catches up. Upon catch up, the current leader executes a leader election and assume broker A wins. Broker A then does some bootstrapping before "switching on" the partition and serving fetch/produce requests. Part of the bootstrap is determining which replicas are in ISR (read from ZK) by waiting for the replica to catch up, but because the server isn't responding to fetch requests for the replica and it isn't aware of where every replica is in terms of its HW and LEO, none will ever catch up "in time". Am I missing something?
          Hide
          Jun Rao added a comment -

          It may be possible, but this can be a bit tricky. When you move a partition, you want to wait until the partition in the new broker has fully caught up, before deleting the one on the old broker. So, one way to achieve this is to have another ZK path that indicates this transition state. After the transition is done, the new assignment will be added to the assigned_partition path.

          In any case, let's start by just focusing on static partition assignment. We can worry about partition reassignment a bit later when we get to kafka-42.

          Show
          Jun Rao added a comment - It may be possible, but this can be a bit tricky. When you move a partition, you want to wait until the partition in the new broker has fully caught up, before deleting the one on the old broker. So, one way to achieve this is to have another ZK path that indicates this transition state. After the transition is done, the new assignment will be added to the assigned_partition path. In any case, let's start by just focusing on static partition assignment. We can worry about partition reassignment a bit later when we get to kafka-42.
          Hide
          Prashanth Menon added a comment -

          Can we also use this new path for partition reassignment? When the admin would like to reassign a partition, the broker's ID is appeneded to the list at /brokers/partitions_reassigned/[topic]/[partId] and a znode is created in /brokers/[brokerId]/assigned_partitions/[topic:partId] ? Then only the leader listens on the reassignment path in order to update the leader replicas RAR and the new brokers become aware of new partitions and bootstrap as they normally would. Thoughts?

          Show
          Prashanth Menon added a comment - Can we also use this new path for partition reassignment? When the admin would like to reassign a partition, the broker's ID is appeneded to the list at /brokers/partitions_reassigned/ [topic] / [partId] and a znode is created in /brokers/ [brokerId] /assigned_partitions/ [topic:partId] ? Then only the leader listens on the reassignment path in order to update the leader replicas RAR and the new brokers become aware of new partitions and bootstrap as they normally would. Thoughts?
          Hide
          Prashanth Menon added a comment -

          Ah, I think I see where you're going with this. You'd like to use it as a persistent path listing all partitions assigned to a broker. If so, perhaps the path can be changed slightly to /brokers/[brokerId]/assigned_partitions/[topic:partitionId] to indicate appropriately?

          My intention was to have that just for new partition assignment notifications and act more like a queue. So the flow would be:

          1. Tool assigns topic X, partition Y to broker Z
          1a. New entry added to /broker/topics/X/partitions/Y/replicas to append broker Z to list.
          1b. New znode created at /broker/Z/new_partition/[X:Y]
          2. Broker listens on /broker/Z/new_partition
          2a. New partition is assigned, bootstrap replicas
          2b After successful bootstrap, remove /broker/Z/new_partition/[X:Y]

          I like you're idea much better; it's clear which partitions are assigned to which broker and makes startup simpler as well.

          Show
          Prashanth Menon added a comment - Ah, I think I see where you're going with this. You'd like to use it as a persistent path listing all partitions assigned to a broker. If so, perhaps the path can be changed slightly to /brokers/ [brokerId] /assigned_partitions/ [topic:partitionId] to indicate appropriately? My intention was to have that just for new partition assignment notifications and act more like a queue. So the flow would be: 1. Tool assigns topic X, partition Y to broker Z 1a. New entry added to /broker/topics/X/partitions/Y/replicas to append broker Z to list. 1b. New znode created at /broker/Z/new_partition/ [X:Y] 2. Broker listens on /broker/Z/new_partition 2a. New partition is assigned, bootstrap replicas 2b After successful bootstrap, remove /broker/Z/new_partition/ [X:Y] I like you're idea much better; it's clear which partitions are assigned to which broker and makes startup simpler as well.
          Hide
          Jun Rao added a comment -

          Not sure if I follow exactly what you are proposing here. In particular, what does the broker recreate? My thinking is the following:

          /brokers/[brokerId]/new_partition/[topic:partitionId] is the source of truth about what topic/partitions a broker has and that path is only created during topic creation. A broker listens to that path to pick up new topic/partition assigned to it. When a broker starts up, it simply reads all /brokers/[brokerId]/new_partition/[topic:partitionId] to determine the set of topic/partition that it should have. So the broker never needs to recreate that path.

          Show
          Jun Rao added a comment - Not sure if I follow exactly what you are proposing here. In particular, what does the broker recreate? My thinking is the following: /brokers/ [brokerId] /new_partition/ [topic:partitionId] is the source of truth about what topic/partitions a broker has and that path is only created during topic creation. A broker listens to that path to pick up new topic/partition assigned to it. When a broker starts up, it simply reads all /brokers/ [brokerId] /new_partition/ [topic:partitionId] to determine the set of topic/partition that it should have. So the broker never needs to recreate that path.
          Hide
          Prashanth Menon added a comment -

          Sounds good. Another question is what to do with that path should the broker go down with pending assignments. My vote would be for the broker to recreate it as part of the startup process, thereby deleting previous data. My thinking is that the broker will discover all assigned replicas naturally as part of startup assuming the topic creation admin tool did its job correctly updating partition replica lists in ZK.

          Show
          Prashanth Menon added a comment - Sounds good. Another question is what to do with that path should the broker go down with pending assignments. My vote would be for the broker to recreate it as part of the startup process, thereby deleting previous data. My thinking is that the broker will discover all assigned replicas naturally as part of startup assuming the topic creation admin tool did its job correctly updating partition replica lists in ZK.
          Hide
          Jun Rao added a comment -

          Prshanth,

          This seems like a good idea to me. Ideally, this new ZK path /brokers/[brokerId]/new_partition/[topic:partitionId] should be created atomically with other ZK paths constructed when a new topic is created. We could leverage the ZK multiple-row transaction support for this when ZK 3.4 is more stable.

          Show
          Jun Rao added a comment - Prshanth, This seems like a good idea to me. Ideally, this new ZK path /brokers/ [brokerId] /new_partition/ [topic:partitionId] should be created atomically with other ZK paths constructed when a new topic is created. We could leverage the ZK multiple-row transaction support for this when ZK 3.4 is more stable.
          Hide
          Prashanth Menon added a comment -

          Hi all,

          I spent today working through this (and consequently KAFKA-45 because there's a lot of overlap), hoping to have a patch in mid-week, and had a suggestion regarding how new replicas are assigned to brokers. Currently, every broker effectively listens to every possible path in /brokers/topic/[topic]/partitions/[partition] because it doesn't know which partition replica will be assigned to it. This places heavy burden on ZK since it will have to fire an unncecessary high volume of callbacks to every broker.

          The problem is that the broker doesn't know for which partition it will be assigned a replica for after its up. I propose an approach whereby a new path in ZK acts as a queue which brokers watch to receive new replica assignments. This path will exist for every broker in the system, so something like /brokers/[brokerId]/new_partition/[topic:partitionId] or some such thing? The admin utilities will create these paths along with the replica assignments. I realize this is additional overhead and somewhat asymmetric with the other partition paths so I'm interested to hear peoples thoughts!

          Show
          Prashanth Menon added a comment - Hi all, I spent today working through this (and consequently KAFKA-45 because there's a lot of overlap), hoping to have a patch in mid-week, and had a suggestion regarding how new replicas are assigned to brokers. Currently, every broker effectively listens to every possible path in /brokers/topic/ [topic] /partitions/ [partition] because it doesn't know which partition replica will be assigned to it. This places heavy burden on ZK since it will have to fire an unncecessary high volume of callbacks to every broker. The problem is that the broker doesn't know for which partition it will be assigned a replica for after its up. I propose an approach whereby a new path in ZK acts as a queue which brokers watch to receive new replica assignments. This path will exist for every broker in the system, so something like /brokers/ [brokerId] /new_partition/ [topic:partitionId] or some such thing? The admin utilities will create these paths along with the replica assignments. I realize this is additional overhead and somewhat asymmetric with the other partition paths so I'm interested to hear peoples thoughts!

            People

            • Assignee:
              Neha Narkhede
              Reporter:
              Jun Rao
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development