Details

    • Type: Sub-task Sub-task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:

      Description

      This JIRA will involve implementing the list of actions to be taken on broker startup, as listed by the brokerStartup() and startReplica() algorithm in the Kafka replication design doc. Since the stateChangeListener is part of KAFKA-44, this JIRA can leave it as a stub.

      1. kafka-301-draft.patch
        18 kB
        Neha Narkhede
      2. kafka-301-v1.patch
        36 kB
        Neha Narkhede
      3. kafka-301-v2.patch
        37 kB
        Neha Narkhede
      4. kafka-301-v3.patch
        45 kB
        Neha Narkhede

        Issue Links

          Activity

          Hide
          Neha Narkhede added a comment -

          Thanks for reviewing the patches ! Just committed this.

          Show
          Neha Narkhede added a comment - Thanks for reviewing the patches ! Just committed this.
          Hide
          Neha Narkhede added a comment -

          Thanks for the review Jun !

          3.1. This check protects against a subtle issue. The take() API removes a node only from the internal queue, not from ZK. An item is removed from the ZkQueue only through the remove() API. This is to handle "act-and-then-delete" behavior for a state change. Now if another item is added to a ZkQueue after the previous item was taken, but before it was removed from ZK, there is a risk of adding that previous item back into the internal queue. latestQueueItemPriority tracks the last item that was taken from the queue, so only items with priority larger than that can be added to the queue.

          3.2 Will remove that on the checkin.

          Show
          Neha Narkhede added a comment - Thanks for the review Jun ! 3.1. This check protects against a subtle issue. The take() API removes a node only from the internal queue, not from ZK. An item is removed from the ZkQueue only through the remove() API. This is to handle "act-and-then-delete" behavior for a state change. Now if another item is added to a ZkQueue after the previous item was taken, but before it was removed from ZK, there is a risk of adding that previous item back into the internal queue. latestQueueItemPriority tracks the last item that was taken from the queue, so only items with priority larger than that can be added to the queue. 3.2 Will remove that on the checkin.
          Hide
          Jun Rao added a comment -

          v3 looks good to me. A couple of minor comments:
          31. ZkQueue: Is latestQueueItemPriority used to prevent sequential nodes being added out of order in ZK? If so, should we log an error if this does happen?
          32. StateChangeCommandHandler.run(): local vals topic, partition, and epoch are not used.

          Show
          Jun Rao added a comment - v3 looks good to me. A couple of minor comments: 31. ZkQueue: Is latestQueueItemPriority used to prevent sequential nodes being added out of order in ZK? If so, should we log an error if this does happen? 32. StateChangeCommandHandler.run(): local vals topic, partition, and epoch are not used.
          Hide
          Neha Narkhede added a comment -

          Anybody up for reviewing patch v3 ?

          Show
          Neha Narkhede added a comment - Anybody up for reviewing patch v3 ?
          Hide
          Neha Narkhede added a comment -

          Jun's comments -
          1, 4, 5, 6. Fixed it

          2. Changed the ZkQueue to be sort of a blocking queue as per Jay's suggestion. It is not a complete blocking queue since the put operation does not block. The leader needs to know that the queue is full immediately to be able to initiate some kind of queue shrink operation. The dequeue operation is blocking.

          3.1 Removed the StateChangeListener since now, the ZkQueue wraps its own listener
          3.2 Good point. Fixed it.

          Jay's comments -
          1. You have a valid point here. I agree that we should be able to refactor the code to wrap up custom ZK logic in Kafka classes. Right now, the leader election stuff it stuck in there and hence the epoch increment API too. But since we might end up changing the leader election algorithm itself, I would suggest waiting a bit before attempting this refactoring.

          2. Changed ZkQueue and StateChangeCommand for consistency.

          3. I like this suggestion, gave it a shot. Let me know what you think.

          Show
          Neha Narkhede added a comment - Jun's comments - 1, 4, 5, 6. Fixed it 2. Changed the ZkQueue to be sort of a blocking queue as per Jay's suggestion. It is not a complete blocking queue since the put operation does not block. The leader needs to know that the queue is full immediately to be able to initiate some kind of queue shrink operation. The dequeue operation is blocking. 3.1 Removed the StateChangeListener since now, the ZkQueue wraps its own listener 3.2 Good point. Fixed it. Jay's comments - 1. You have a valid point here. I agree that we should be able to refactor the code to wrap up custom ZK logic in Kafka classes. Right now, the leader election stuff it stuck in there and hence the epoch increment API too. But since we might end up changing the leader election algorithm itself, I would suggest waiting a bit before attempting this refactoring. 2. Changed ZkQueue and StateChangeCommand for consistency. 3. I like this suggestion, gave it a shot. Let me know what you think.
          Hide
          Jay Kreps added a comment -

          Here are some superficial comments:
          1. This adds more kafka-specific logic into ZkUtils as static methods. I think Prasanth commented on this. Maybe given the current state, it is better to ignore this for now, but I feel this is a bad approach as ZkUtils is fundamentally unmockable, right? A better way would be to make fully object-oriented classes that represent the domain logic we have around partitions, clusters, epochs, etc. This is harder to do because you have to think through this model but ultimately easier to work with and test, I feel. It definitely makes sense to have utility methods like readData() but application specific methods like incrementEpochForPartition() are less good.
          2. "ZKQueue" should be "ZkQueue" for consistency I think.
          3. StateChangeRequest is not really a request in the sense of Request.scala, right? We are already unfortunately overloading Request twice, once for API objects and once for the RequestQueue items. Could we call these Commands?
          4. I like the ZKQueue a lot, I think that approach to wrapping up zk logic is very good. One thing though is that the usage of this queue is a bit difficult because as I understand the poll/drain methods are non-blocking, so one has to write a zk watcher to trigger the action. I wonder if a better way wouldn't be to wrap up the zk watcher logic as well so that the queue acted more like a juc BlockingQueue. In this case the usage would be more like while(true)

          { val cmd = q.take(); handle(cmd);}

          . In other words, I should be able to start a kafka broker I have instantiated with a fake ZkQueue and pump in cmds to test it out without ZK. I am not sure if this is really possible to do or if it would actually be better, just throwing it out there...

          Show
          Jay Kreps added a comment - Here are some superficial comments: 1. This adds more kafka-specific logic into ZkUtils as static methods. I think Prasanth commented on this. Maybe given the current state, it is better to ignore this for now, but I feel this is a bad approach as ZkUtils is fundamentally unmockable, right? A better way would be to make fully object-oriented classes that represent the domain logic we have around partitions, clusters, epochs, etc. This is harder to do because you have to think through this model but ultimately easier to work with and test, I feel. It definitely makes sense to have utility methods like readData() but application specific methods like incrementEpochForPartition() are less good. 2. "ZKQueue" should be "ZkQueue" for consistency I think. 3. StateChangeRequest is not really a request in the sense of Request.scala, right? We are already unfortunately overloading Request twice, once for API objects and once for the RequestQueue items. Could we call these Commands? 4. I like the ZKQueue a lot, I think that approach to wrapping up zk logic is very good. One thing though is that the usage of this queue is a bit difficult because as I understand the poll/drain methods are non-blocking, so one has to write a zk watcher to trigger the action. I wonder if a better way wouldn't be to wrap up the zk watcher logic as well so that the queue acted more like a juc BlockingQueue. In this case the usage would be more like while(true) { val cmd = q.take(); handle(cmd);} . In other words, I should be able to start a kafka broker I have instantiated with a fake ZkQueue and pump in cmds to test it out without ZK. I am not sure if this is really possible to do or if it would actually be better, just throwing it out there...
          Hide
          Jun Rao added a comment -

          Thanks for the patch. Some comments:
          1. some new files without Apache header.

          2. ZKQueue: Reading items one at a time out of the queue seems expensive since we have to read all children, sort them and get the head. The drainAll and readAll api are much more efficient. Do we really need apis like poll and peek that deal with one item at a time?

          3. KafkaZooKeeper:
          3.1 Can StateChangeRequestHandler be created inside StateChangeListener?
          3.2 StateChangeListener.ensureEpochValidity(). Not all state changes are initiated by a leader. For example, create topic ddl directly publishes state change requests,

          4. LeaderElectionTest: unused imports

          5. No need to change VerifyConsumerRebalance.

          6. Yes, it would be better if we use json representing the value of each state change.

          Show
          Jun Rao added a comment - Thanks for the patch. Some comments: 1. some new files without Apache header. 2. ZKQueue: Reading items one at a time out of the queue seems expensive since we have to read all children, sort them and get the head. The drainAll and readAll api are much more efficient. Do we really need apis like poll and peek that deal with one item at a time? 3. KafkaZooKeeper: 3.1 Can StateChangeRequestHandler be created inside StateChangeListener? 3.2 StateChangeListener.ensureEpochValidity(). Not all state changes are initiated by a leader. For example, create topic ddl directly publishes state change requests, 4. LeaderElectionTest: unused imports 5. No need to change VerifyConsumerRebalance. 6. Yes, it would be better if we use json representing the value of each state change.
          Hide
          Neha Narkhede added a comment -

          I think one thing that can be improved is the structure of our ZK metadata. This is not limited just to this patch. I think for znodes that store more than one value could benefit using a simple json format instead of combinations of "," and ":". This is true for the state change nodes as well as the ISR path.

          Al though, since this is just a format change and we can still review v2 since the main functionality added is the state change listener, epoch and the ZKQueue.

          Show
          Neha Narkhede added a comment - I think one thing that can be improved is the structure of our ZK metadata. This is not limited just to this patch. I think for znodes that store more than one value could benefit using a simple json format instead of combinations of "," and ":". This is true for the state change nodes as well as the ISR path. Al though, since this is just a format change and we can still review v2 since the main functionality added is the state change listener, epoch and the ZKQueue.
          Hide
          Neha Narkhede added a comment -

          I moved QueueFullException from kafka.producer.async to kafka.common. I think that somehow didn't get reflected in the v1 patch.

          Uploading a patch that applies cleanly on 0.8

          Show
          Neha Narkhede added a comment - I moved QueueFullException from kafka.producer.async to kafka.common. I think that somehow didn't get reflected in the v1 patch. Uploading a patch that applies cleanly on 0.8
          Hide
          Jun Rao added a comment -

          Can't seem to apply patch v1 to 0.8 branch.

          patching file core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
          patching file core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
          patching file core/src/test/scala/unit/kafka/server/StateChangeTest.scala
          patching file core/src/main/scala/kafka/producer/Producer.scala
          patching file core/src/main/scala/kafka/producer/async/QueueFullException.scala
          patching file core/src/main/scala/kafka/admin/AdminUtils.scala
          patching file core/src/main/scala/kafka/common/NoEpochForPartitionException.scala
          can't find file to patch at input line 374
          Perhaps you used the wrong -p or --strip option?
          The text leading up to this was:
          --------------------------

          Index: core/src/main/scala/kafka/common/QueueFullException.scala
          ===================================================================
          — core/src/main/scala/kafka/common/QueueFullException.scala (revision 1304473)
          +++ core/src/main/scala/kafka/common/QueueFullException.scala (working copy)
          --------------------------
          File to patch:
          Show
          Jun Rao added a comment - Can't seem to apply patch v1 to 0.8 branch. patching file core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala patching file core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala patching file core/src/test/scala/unit/kafka/server/StateChangeTest.scala patching file core/src/main/scala/kafka/producer/Producer.scala patching file core/src/main/scala/kafka/producer/async/QueueFullException.scala patching file core/src/main/scala/kafka/admin/AdminUtils.scala patching file core/src/main/scala/kafka/common/NoEpochForPartitionException.scala can't find file to patch at input line 374 Perhaps you used the wrong -p or --strip option? The text leading up to this was: -------------------------- Index: core/src/main/scala/kafka/common/QueueFullException.scala =================================================================== — core/src/main/scala/kafka/common/QueueFullException.scala (revision 1304473) +++ core/src/main/scala/kafka/common/QueueFullException.scala (working copy) -------------------------- File to patch:
          Hide
          Neha Narkhede added a comment -

          1. Introduced ZKQueue to communicate state change requests from the leader to the followers. However, the logic for shrinking a full queue is left for another patch, when we actually have create/delete topic or add/remove partition that enqueues requests into the queue.

          2. State change requests are persistent, since some state changes need to be communicated even when the follower is not up (e.g delete topic)

          3. Introduced leader epochs. The follower can skip certain state change requests if the request epoch is less than the current epoch for that partition.

          4. Added tests for the ZKQueue, and left TODOs for testing StateChangeRequestHandler. These tests can be added as part of create/delete topic support

          Show
          Neha Narkhede added a comment - 1. Introduced ZKQueue to communicate state change requests from the leader to the followers. However, the logic for shrinking a full queue is left for another patch, when we actually have create/delete topic or add/remove partition that enqueues requests into the queue. 2. State change requests are persistent, since some state changes need to be communicated even when the follower is not up (e.g delete topic) 3. Introduced leader epochs. The follower can skip certain state change requests if the request epoch is less than the current epoch for that partition. 4. Added tests for the ZKQueue, and left TODOs for testing StateChangeRequestHandler. These tests can be added as part of create/delete topic support
          Hide
          Neha Narkhede added a comment -

          Canceling patch since the epoch generation can be done without sequential nodes in ZK

          Show
          Neha Narkhede added a comment - Canceling patch since the epoch generation can be done without sequential nodes in ZK
          Hide
          Neha Narkhede added a comment - - edited

          The refactoring in KAFKA-307 attempted to reduce such interdependencies amongst the various server components. Your suggestion of keeping all logic related to managing replicas in ReplicaManager makes a lot of sense. However, it seems like a good idea to keep all ZK specific logic bundled inside KafkaZookeeper. Ideally, LogManager, ReplicaManager and KafkaZookeeper should really not know about each other. Only KafkaServer "manages" these sub-components. But, in reality, it could be slightly difficult to keep these sub components completely independent of each other.

          Show
          Neha Narkhede added a comment - - edited The refactoring in KAFKA-307 attempted to reduce such interdependencies amongst the various server components. Your suggestion of keeping all logic related to managing replicas in ReplicaManager makes a lot of sense. However, it seems like a good idea to keep all ZK specific logic bundled inside KafkaZookeeper. Ideally, LogManager, ReplicaManager and KafkaZookeeper should really not know about each other. Only KafkaServer "manages" these sub-components. But, in reality, it could be slightly difficult to keep these sub components completely independent of each other.
          Hide
          Prashanth Menon added a comment -

          A suggesstion I'd like to throw out there: Can we move the core replica logic (leaderElection, becomeLeader and becomeFollower) to where replicas are actually managed, mainly ReplicaManager? I think this will keep things cleaner and concise. If we do this, we should extract kafka-specific ZK logic out of ZkUtils into KafkaZooKeeper such as functions to read the ISR, AR and leader replica or register leader change listeners. With this, ReplicaManager becomes the central place to manage replicas; it uses KafkaZooKeeper to interact with ZK in a kafka-specific way and uses LogManager to create local replicas. KafkaApis, StateRequestHandler and all the topic/leader/partition reassignment listeners use ReplicaManager to manage replicas in a general fashion.

          Thoughts?

          Show
          Prashanth Menon added a comment - A suggesstion I'd like to throw out there: Can we move the core replica logic (leaderElection, becomeLeader and becomeFollower) to where replicas are actually managed, mainly ReplicaManager? I think this will keep things cleaner and concise. If we do this, we should extract kafka-specific ZK logic out of ZkUtils into KafkaZooKeeper such as functions to read the ISR, AR and leader replica or register leader change listeners. With this, ReplicaManager becomes the central place to manage replicas; it uses KafkaZooKeeper to interact with ZK in a kafka-specific way and uses LogManager to create local replicas. KafkaApis, StateRequestHandler and all the topic/leader/partition reassignment listeners use ReplicaManager to manage replicas in a general fashion. Thoughts?
          Hide
          Neha Narkhede added a comment -

          Sounds good. Probably letting the JIRA and related discussions/patches be, might be a good idea. This JIRA is marked to incorporate KAFKA-44.

          Show
          Neha Narkhede added a comment - Sounds good. Probably letting the JIRA and related discussions/patches be, might be a good idea. This JIRA is marked to incorporate KAFKA-44 .
          Hide
          Prashanth Menon added a comment - - edited

          Haha, no problem Neha. I'll let you wrap up KAFKA-301 and I can get started on KAFKA-302. If all goes well, I can get a patch in by Wednesday and after reviews and adjustments, we should be able to commit by Friday.

          We should also probably go ahead and remove KAFKA-44?

          Show
          Prashanth Menon added a comment - - edited Haha, no problem Neha. I'll let you wrap up KAFKA-301 and I can get started on KAFKA-302 . If all goes well, I can get a patch in by Wednesday and after reviews and adjustments, we should be able to commit by Friday. We should also probably go ahead and remove KAFKA-44 ?
          Hide
          Neha Narkhede added a comment -

          Yeah, after finishing up on KAFKA-300, I took up this one and while working on it, the line between KAFKA-44 and this one started blurring. Though the real piece of work would be KAFKA-302. Sorry, didn't mean to step on someone else's toes. Just want to clear the path to let my work on KAFKA-46 progress. Having said that, do you want to pick up KAFKA-302 or even pick up this one ? I actually don't mind anything as long as we have KAFKA-45 resolved by next week.

          Show
          Neha Narkhede added a comment - Yeah, after finishing up on KAFKA-300 , I took up this one and while working on it, the line between KAFKA-44 and this one started blurring. Though the real piece of work would be KAFKA-302 . Sorry, didn't mean to step on someone else's toes. Just want to clear the path to let my work on KAFKA-46 progress. Having said that, do you want to pick up KAFKA-302 or even pick up this one ? I actually don't mind anything as long as we have KAFKA-45 resolved by next week.
          Hide
          Prashanth Menon added a comment -

          Uh oh, it looks like this patch implements functionality that was part of KAFKA-44? Do we still need 44 then?

          Show
          Prashanth Menon added a comment - Uh oh, it looks like this patch implements functionality that was part of KAFKA-44 ? Do we still need 44 then?
          Hide
          Neha Narkhede added a comment -

          This patch adds the state change listeners, al though they will be exercised only when KAFKA-302 is in.

          1. Introduced ZKQueue to communicate state change requests from the leader to the followers

          2. State change requests are persistent, since some state changes need to be communicated even when the follower is not up (e.g delete topic)

          3. Introduced leader epochs. The follower can skip certain state change requests if the request epoch is less than the current epoch for that partition

          4. Added tests for the ZKQueue, and left TODOs for testing StateChangeRequestHandler. These tests can be added as part of KAFKA-302

          Show
          Neha Narkhede added a comment - This patch adds the state change listeners, al though they will be exercised only when KAFKA-302 is in. 1. Introduced ZKQueue to communicate state change requests from the leader to the followers 2. State change requests are persistent, since some state changes need to be communicated even when the follower is not up (e.g delete topic) 3. Introduced leader epochs. The follower can skip certain state change requests if the request epoch is less than the current epoch for that partition 4. Added tests for the ZKQueue, and left TODOs for testing StateChangeRequestHandler. These tests can be added as part of KAFKA-302
          Hide
          Neha Narkhede added a comment -

          Since broker startup needs to use leader election

          Show
          Neha Narkhede added a comment - Since broker startup needs to use leader election

            People

            • Assignee:
              Neha Narkhede
              Reporter:
              Neha Narkhede
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development