Kafka
  1. Kafka
  2. KAFKA-47 Create topic support and new ZK data structures for intra-cluster replication
  3. KAFKA-329

Remove the watches/broker for new topics and partitions and change create topic admin API to send start replica state change to all brokers

    Details

    • Type: Sub-task Sub-task
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:

      Description

      Currently in 0.8, all brokers register a watch on /brokers/topics and /brokers/topics/[topic] for all topics in a Kafka cluster. The watches are required to discover new topics.
      There is another way this can be achieved, as proposed here - https://cwiki.apache.org/confluence/display/KAFKA/Kafka+replication+detailed+design+V2#KafkareplicationdetaileddesignV2-Createtopic

      Basically, the create-topic admin command sends start-replica state change request to all brokers in the assigned replicas list.

      1. KAFKA-329-v1.patch
        40 kB
        Prashanth Menon
      2. KAFKA-329-DRAFT-v2.patch
        33 kB
        Prashanth Menon
      3. KAFKA-329-DRAFT.patch
        29 kB
        Prashanth Menon

        Activity

        Hide
        Prashanth Menon added a comment -

        Committed to 0.8.

        Show
        Prashanth Menon added a comment - Committed to 0.8.
        Hide
        Prashanth Menon added a comment -

        Hi Jun,

        31, 32, 33, 34 - All Done. I've committed the changes to 0.8.

        As for the transient failure, I'm not able to reproduce them, but I suspect it may have something to do with the timing of topic creation and leader election. The tests in ZooKeeperConsumerConnectorTest use auto-creation of topics, and so, the DefaultEventHandler may attempt send messages three times (by default) and fail before the leader is able to bootstrap itself. If this is indeed the case, we can either increase the producer retry count or, more deterministically, create and wait for the leader using the normal CreateTopicCommand.createTopic and TestUtils.waitUntilLeaderIsElected methods (which I'd prefer).

        Show
        Prashanth Menon added a comment - Hi Jun, 31, 32, 33, 34 - All Done. I've committed the changes to 0.8. As for the transient failure, I'm not able to reproduce them, but I suspect it may have something to do with the timing of topic creation and leader election. The tests in ZooKeeperConsumerConnectorTest use auto-creation of topics, and so, the DefaultEventHandler may attempt send messages three times (by default) and fail before the leader is able to bootstrap itself. If this is indeed the case, we can either increase the producer retry count or, more deterministically, create and wait for the leader using the normal CreateTopicCommand.createTopic and TestUtils.waitUntilLeaderIsElected methods (which I'd prefer).
        Hide
        Jun Rao added a comment -

        Prahsanth,

        I was trying to figure out why ZookeeperConsumerConnectorTest.testCompressionSetConsumption() failed. For that, I added a bit more logging in DefaultEventHandler. However, after the logging was added, the test seems to pass. I suspect this is probably due to a time-dependant transient failure. I checked in the extra logging as a trivial change. So, if you can fix the remaining comments and there is no more unit test failure, you can just commit the patch without further review.

        Show
        Jun Rao added a comment - Prahsanth, I was trying to figure out why ZookeeperConsumerConnectorTest.testCompressionSetConsumption() failed. For that, I added a bit more logging in DefaultEventHandler. However, after the logging was added, the test seems to pass. I suspect this is probably due to a time-dependant transient failure. I checked in the extra logging as a trivial change. So, if you can fix the remaining comments and there is no more unit test failure, you can just commit the patch without further review.
        Hide
        Jun Rao added a comment -

        Prashanth,

        Thanks for patch v1. Some comments:

        31. AdminUtils: remove used imports

        32. CreateTopicCommand.createTopic(): add space after if when assigning to partitionReplicaAssignment

        33. TopicChangeListener.handleChildChange(): add a TODO comment for handling topic deletion.

        34. ZookeeperConsumerConnectorTest.testCompressionSetConsumption() seems to always fail on sending messages now. Not clear to me why though.

        Show
        Jun Rao added a comment - Prashanth, Thanks for patch v1. Some comments: 31. AdminUtils: remove used imports 32. CreateTopicCommand.createTopic(): add space after if when assigning to partitionReplicaAssignment 33. TopicChangeListener.handleChildChange(): add a TODO comment for handling topic deletion. 34. ZookeeperConsumerConnectorTest.testCompressionSetConsumption() seems to always fail on sending messages now. Not clear to me why though.
        Hide
        Jay Kreps added a comment -

        I recommend we just use the json library scala includes. It is supposed to be slow and not very general, but for our limited purposes that is probably fine.

        Show
        Jay Kreps added a comment - I recommend we just use the json library scala includes. It is supposed to be slow and not very general, but for our limited purposes that is probably fine.
        Hide
        Prashanth Menon added a comment - - edited

        Hi Jun,

        New patch with your changes incorporated. I think it's functionally complete, but I may have missed some things.

        As for a json library, I'm not sure what plays well specifically with Scala, though I suspect most basic Java json libs should be fine. I personally have experience with Jackson which has a nice Scala wrapper lib called Jerkson (https://github.com/codahale/jerkson). There is also Lift-JSON, but I'm not familiar with though I've heard good things from people. Perhaps integrating a JSON lib can be part of a separate JIRA?

        Show
        Prashanth Menon added a comment - - edited Hi Jun, New patch with your changes incorporated. I think it's functionally complete, but I may have missed some things. As for a json library, I'm not sure what plays well specifically with Scala, though I suspect most basic Java json libs should be fine. I personally have experience with Jackson which has a nice Scala wrapper lib called Jerkson ( https://github.com/codahale/jerkson ). There is also Lift-JSON, but I'm not familiar with though I've heard good things from people. Perhaps integrating a JSON lib can be part of a separate JIRA?
        Hide
        Jun Rao added a comment -

        Thanks for draft v2. Some quick comments:

        21. KafkaZookeeper: Do we still need onTopicChange and initLeader if we keep handleNewTopics?

        22. AdminUtils: We should get rid of the old way of storing topic data in ZK and just keep the new way.

        As for json library, any json library works well with Scala?

        Show
        Jun Rao added a comment - Thanks for draft v2. Some quick comments: 21. KafkaZookeeper: Do we still need onTopicChange and initLeader if we keep handleNewTopics? 22. AdminUtils: We should get rid of the old way of storing topic data in ZK and just keep the new way. As for json library, any json library works well with Scala?
        Hide
        Prashanth Menon added a comment -

        Thanks for the review, Jun. New draft is attached incorporating both your suggestions, though it's still in draft and not final. My only concern, and I'm not sure if it's my machine or not, but the ZooKeeperConsumerConnectorTest takes an unusually long time to run and throws exceptions intermittently.

        I'll take a final walkthrough tomorrow, along with any additional suggestions and submit another final patch for review.

        Show
        Prashanth Menon added a comment - Thanks for the review, Jun. New draft is attached incorporating both your suggestions, though it's still in draft and not final. My only concern, and I'm not sure if it's my machine or not, but the ZooKeeperConsumerConnectorTest takes an unusually long time to run and throws exceptions intermittently. I'll take a final walkthrough tomorrow, along with any additional suggestions and submit another final patch for review.
        Hide
        Jun Rao added a comment -

        Prashanth,

        Thanks for patch. Some comments:

        1. AdminUtils.createTopicPartitionAssignmentPathInZK(): In V3, we don't need the topic version anymore. So, we don't need to store it in the ZK.

        2. KafkaZookeeper: For this patch, we can just keep the main logic the same as in V2 (i.e., assuming no controller). The only changes will be based on that (1) partition assignment is stored in 1 ZK path per topic; (2) partitions and partition assignment never change after created (such logic will be added later in the controller). So, we can keep subscribeToTopicAndPartitionsChanges(), but obtain partition assignment from a different ZK path. Ditto for handleNewTopics. We can punt on the handling of delete topics since there is a separate jira for that. We can get rid of handleNewPartitions(). Most of the code in this class can probably be reused when we move the logic to the controller.

        Show
        Jun Rao added a comment - Prashanth, Thanks for patch. Some comments: 1. AdminUtils.createTopicPartitionAssignmentPathInZK(): In V3, we don't need the topic version anymore. So, we don't need to store it in the ZK. 2. KafkaZookeeper: For this patch, we can just keep the main logic the same as in V2 (i.e., assuming no controller). The only changes will be based on that (1) partition assignment is stored in 1 ZK path per topic; (2) partitions and partition assignment never change after created (such logic will be added later in the controller). So, we can keep subscribeToTopicAndPartitionsChanges(), but obtain partition assignment from a different ZK path. Ditto for handleNewTopics. We can punt on the handling of delete topics since there is a separate jira for that. We can get rid of handleNewPartitions(). Most of the code in this class can probably be reused when we move the logic to the controller.
        Hide
        Prashanth Menon added a comment - - edited

        Hi all,

        I've attached a very early draft of the work required here. It changes the CreateTopic ddl, currently writing to both old and new locations to keep tests running while the patch is completed. The largest change comes in KafkaZooKeeper to the topic watcher. Thinking about it, most of the code in there regarding leader election will be gone when the controller goes in, but I've left the important pieces in there to keep tests working. I'll continue working to get the consumer ZooKeeperConsumerConnector side working with the new path.

        I'd like to get some preliminary feedback on the patch. Hopefully, I'm going in the right direction based off my understanding on the V3 design. Comments welcome

        Side note: As we move into storing info in ZK as json, should we investigate using a friendlier library? The native Scala one is quite nasty.

        Show
        Prashanth Menon added a comment - - edited Hi all, I've attached a very early draft of the work required here. It changes the CreateTopic ddl, currently writing to both old and new locations to keep tests running while the patch is completed. The largest change comes in KafkaZooKeeper to the topic watcher. Thinking about it, most of the code in there regarding leader election will be gone when the controller goes in, but I've left the important pieces in there to keep tests working. I'll continue working to get the consumer ZooKeeperConsumerConnector side working with the new path. I'd like to get some preliminary feedback on the patch. Hopefully, I'm going in the right direction based off my understanding on the V3 design. Comments welcome Side note: As we move into storing info in ZK as json, should we investigate using a friendlier library? The native Scala one is quite nasty.
        Hide
        Prashanth Menon added a comment -

        Hi Jun,

        Yes, I'm looking to wrap it up this weekend.

        Show
        Prashanth Menon added a comment - Hi Jun, Yes, I'm looking to wrap it up this weekend.
        Hide
        Jun Rao added a comment -

        Prashanth,

        Do you plan to work on this jira anytime soon? Thanks,

        Show
        Jun Rao added a comment - Prashanth, Do you plan to work on this jira anytime soon? Thanks,
        Hide
        Jun Rao added a comment -

        We know that we want to change the ZK layout from the current "1 path per partition" design to the "1 path per topic" one. This can be done either before we move to the controller implementation or after. Doing this as part of the controller implementation may make it too big.

        I was thinking that n this patch, we just move the ZK layout to 1 path per topic without the controller logic in place. We will need to (1) patch the create/delete/list topic ddl; (2) change the topic listener in both KafkaZookeeper and ZookeeperConsumerConnector to read from the new path. This makes it easier when we move to the controller implementation since the correct ZK layout is already in place. If we change the ZK layout later, we will need to first implement the controller logic based on the current ZK layout and change it again later.

        Show
        Jun Rao added a comment - We know that we want to change the ZK layout from the current "1 path per partition" design to the "1 path per topic" one. This can be done either before we move to the controller implementation or after. Doing this as part of the controller implementation may make it too big. I was thinking that n this patch, we just move the ZK layout to 1 path per topic without the controller logic in place. We will need to (1) patch the create/delete/list topic ddl; (2) change the topic listener in both KafkaZookeeper and ZookeeperConsumerConnector to read from the new path. This makes it easier when we move to the controller implementation since the correct ZK layout is already in place. If we change the ZK layout later, we will need to first implement the controller logic based on the current ZK layout and change it again later.
        Hide
        Prashanth Menon added a comment -

        So, changing the CreateTopicCommand and AdminUtils to place the topic+replica assignment into the new ZK path is simple, but I'm not sure changing the ZK watch structure can be done in this ticket without either breaking several tests that rely on existing ZK watch behaviour or implementing additional watchers that will be integrated/used when the controller is implemented. It may be a better idea to make this a subtask of the controller implementation ticket and roll it in with the larger controller patch? Or perhaps break up the controller ticket by the sections outlined in the V3 wiki along with a common framework?

        Let me know if I'm missing something or over complicating things, the V3 design is a little knew to me so I may have missed some discussions.

        Show
        Prashanth Menon added a comment - So, changing the CreateTopicCommand and AdminUtils to place the topic+replica assignment into the new ZK path is simple, but I'm not sure changing the ZK watch structure can be done in this ticket without either breaking several tests that rely on existing ZK watch behaviour or implementing additional watchers that will be integrated/used when the controller is implemented. It may be a better idea to make this a subtask of the controller implementation ticket and roll it in with the larger controller patch? Or perhaps break up the controller ticket by the sections outlined in the V3 wiki along with a common framework? Let me know if I'm missing something or over complicating things, the V3 design is a little knew to me so I may have missed some discussions.
        Hide
        Jun Rao added a comment -

        The ticket is to revisit topic creation based on the V3 design (https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3). We probably should first change the ZK layout by using 1 ZK node per topic, instead of 1 ZK node per partition. This will make it easier to register watches for new topics. Then, when the controller framework is ready, we can move the logic of handling new topics to the controller.

        Show
        Jun Rao added a comment - The ticket is to revisit topic creation based on the V3 design ( https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3 ). We probably should first change the ZK layout by using 1 ZK node per topic, instead of 1 ZK node per partition. This will make it easier to register watches for new topics. Then, when the controller framework is ready, we can move the logic of handling new topics to the controller.

          People

          • Assignee:
            Prashanth Menon
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development