Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1215

Rack-Aware replica assignment option

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.10.0.0
    • Component/s: replication
    • Labels:
      None

      Description

      Adding a rack-id to kafka config. This rack-id can be used during replica assignment by using the max-rack-replication argument in the admin scripts (create topic, etc.). By default the original replication assignment algorithm is used because max-rack-replication defaults to -1. max-rack-replication > -1 is not honored if you are doing manual replica assignment (preffered).

      If this looks good I can add some test cases specific to the rack-aware assignment.

      I can also port this to trunk. We are currently running 0.8.0 in production and need this, so i wrote the patch against that.

      1. rack_aware_replica_assignment_v1.patch
        46 kB
        Joris Van Remoortere
      2. rack_aware_replica_assignment_v2.patch
        44 kB
        Joris Van Remoortere

        Issue Links

          Activity

          Hide
          junrao Jun Rao added a comment -

          Great, thanks Allen.

          Show
          junrao Jun Rao added a comment - Great, thanks Allen.
          Hide
          allenxwang Allen Wang added a comment -

          Jun Rao Updated.

          Show
          allenxwang Allen Wang added a comment - Jun Rao Updated.
          Hide
          junrao Jun Rao added a comment -
          Show
          junrao Jun Rao added a comment - Allen Wang , could you also update the changes to ZK structure in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/kafka/pull/132

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/132
          Hide
          junrao Jun Rao added a comment -

          Issue resolved by pull request 132
          https://github.com/apache/kafka/pull/132

          Show
          junrao Jun Rao added a comment - Issue resolved by pull request 132 https://github.com/apache/kafka/pull/132
          Hide
          aauradkar Aditya Auradkar added a comment -

          Thanks Allan. I'll review it this week.

          Show
          aauradkar Aditya Auradkar added a comment - Thanks Allan. I'll review it this week.
          Hide
          allenxwang Allen Wang added a comment -

          Aditya Auradkar Yes it is ready for review.

          Show
          allenxwang Allen Wang added a comment - Aditya Auradkar Yes it is ready for review.
          Hide
          aauradkar Aditya Auradkar added a comment -

          Allen Wang - Is this patch ready for review? I noticed you add several commits recently but I'm not sure if you are done.

          Show
          aauradkar Aditya Auradkar added a comment - Allen Wang - Is this patch ready for review? I noticed you add several commits recently but I'm not sure if you are done.
          Hide
          varvind Vidhya Arvind added a comment -

          Is there anyway this patch can be part of 0.9.0.0?

          Show
          varvind Vidhya Arvind added a comment - Is there anyway this patch can be part of 0.9.0.0?
          Show
          allenxwang Allen Wang added a comment - Started the work on KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment
          Hide
          aauradkar Aditya Auradkar added a comment -

          Allen Wang - bump.

          Show
          aauradkar Aditya Auradkar added a comment - Allen Wang - bump.
          Hide
          jjkoshy Joel Koshy added a comment -

          Allen Wang you should have access now.

          Show
          jjkoshy Joel Koshy added a comment - Allen Wang you should have access now.
          Hide
          allenxwang Allen Wang added a comment -

          Aditya Auradkar Jun Rao Jay Kreps My apache confluence id is allenxwang, same as my JIRA id. Please let me know when write access is granted. Thanks.

          Show
          allenxwang Allen Wang added a comment - Aditya Auradkar Jun Rao Jay Kreps My apache confluence id is allenxwang, same as my JIRA id. Please let me know when write access is granted. Thanks.
          Hide
          aauradkar Aditya Auradkar added a comment -

          Allen Wang One of the committers can provide you write access once you provide your confluence apache id. Please let me know if you need any help with the KIP/reviews etc. Thanks!

          Show
          aauradkar Aditya Auradkar added a comment - Allen Wang One of the committers can provide you write access once you provide your confluence apache id. Please let me know if you need any help with the KIP/reviews etc. Thanks!
          Hide
          allenxwang Allen Wang added a comment -

          Aditya Auradkar Sure I can create a KIP. However, after I signed up for Apache wiki, I don't seem to have write permission as I don't see "create" on the page header. Anything I need to do?

          Show
          allenxwang Allen Wang added a comment - Aditya Auradkar Sure I can create a KIP. However, after I signed up for Apache wiki, I don't seem to have write permission as I don't see "create" on the page header. Anything I need to do?
          Hide
          aauradkar Aditya Auradkar added a comment -

          Allen Wang Hi Allen. Thanks for the patch. Can you create a KIP to discuss the changes being proposed (since this patch adds configs and ZK structures) ?
          https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

          We are hoping to leverage this patch within LinkedIn as well.

          Show
          aauradkar Aditya Auradkar added a comment - Allen Wang Hi Allen. Thanks for the patch. Can you create a KIP to discuss the changes being proposed (since this patch adds configs and ZK structures) ? https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals We are hoping to leverage this patch within LinkedIn as well.
          Hide
          allenxwang Allen Wang added a comment -

          Jun Rao Can you review the GitHub pull request or have someone take a look?

          Show
          allenxwang Allen Wang added a comment - Jun Rao Can you review the GitHub pull request or have someone take a look?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user allenxwang opened a pull request:

          https://github.com/apache/kafka/pull/132

          KAFKA-1215: Rack-Aware replica assignment option

          The PR tries to achieve the following:

          • Make rack-aware assignment and rack data structure optional as opposed to be part of the core data structure/protocol to ease the migration. The implementation of that returns the map of broker to rack is pluggable. User needs to pass the implementation class as a Kafka runtime configuration or command line argument.
          • The rack aware replica assignment is best effort when distributing the replicas to racks. When there are more replicas than racks, it ensures each rack has at least one replica. When there are more racks than replicas, it ensures each rack has at most one replica. It also tries to keep the even distribution of replicas among brokers and racks when possible.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/allenxwang/kafka KAFKA-1215

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/132.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #132


          commit 3cccc5db23ee7987a1811d630f14de66a99ce638
          Author: Allen Wang <awang@netflix.com>
          Date: 2015-08-11T17:52:37Z

          KAFKA-1215: Rack-Aware replica assignment option


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user allenxwang opened a pull request: https://github.com/apache/kafka/pull/132 KAFKA-1215 : Rack-Aware replica assignment option The PR tries to achieve the following: Make rack-aware assignment and rack data structure optional as opposed to be part of the core data structure/protocol to ease the migration. The implementation of that returns the map of broker to rack is pluggable. User needs to pass the implementation class as a Kafka runtime configuration or command line argument. The rack aware replica assignment is best effort when distributing the replicas to racks. When there are more replicas than racks, it ensures each rack has at least one replica. When there are more racks than replicas, it ensures each rack has at most one replica. It also tries to keep the even distribution of replicas among brokers and racks when possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/allenxwang/kafka KAFKA-1215 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/132.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #132 commit 3cccc5db23ee7987a1811d630f14de66a99ce638 Author: Allen Wang <awang@netflix.com> Date: 2015-08-11T17:52:37Z KAFKA-1215 : Rack-Aware replica assignment option
          Hide
          allenxwang Allen Wang added a comment -

          Jun Rao AWS region (for example us-east-1) can be modeled as a DC. Each region has one or more zones (us-east-1c, us-east-1d, us-east-1e, etc). We model the zone as a rack. Our Kafka cluster spans across zones, but not regions.

          Show
          allenxwang Allen Wang added a comment - Jun Rao AWS region (for example us-east-1) can be modeled as a DC. Each region has one or more zones (us-east-1c, us-east-1d, us-east-1e, etc). We model the zone as a rack. Our Kafka cluster spans across zones, but not regions.
          Hide
          junrao Jun Rao added a comment -

          Allen Wang, thanks for the update. How do you model this in Neflix's deployment in AWS? Do you just model each DC as a rack?

          Show
          junrao Jun Rao added a comment - Allen Wang , thanks for the update. How do you model this in Neflix's deployment in AWS? Do you just model each DC as a rack?
          Hide
          jkreps Jay Kreps added a comment -

          This is great!

          Show
          jkreps Jay Kreps added a comment - This is great!
          Hide
          allenxwang Allen Wang added a comment -

          We have a working solution now for rack aware assignment. It is based on current patch for this JIRA but with some improvement. The key idea of the solution is:

          • Rack ID is a String instead of integer
          • For replica assignment, add an extra parameter of Map[Int, String] to assignReplicasToBrokers() method which maps broker ID to rack ID
          • Before doing the rack aware assignment, sort the broker list such that they are interlaced according to the rack. In other words, adjacent brokers should not be in the same rack if possible . For example, assuming 6 brokers mapping to 3 racks:

          0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 -> "rack2", 4 -> "rack3", 5 -> "rack3"

          The sorted broker list could be (0, 2, 4, 1, 3, 5)

          • Apply the same assignment algorithm to assign replicas, with the addition of skipping a broker if its rack is already used for the same partition (similar to what has been done in current patch)

          The benefit of this approach is that replica distribution is kept as even as possible to all the racks and brokers.

          With regard to KAFKA-1792, an easy solution is to restrict replica movement within the same rack, which I think should work in most practical cases. It will also have added benefit that usually replicas move faster within a rack. So basically we can apply the same algorithm described in KAFKA-1792 for each rack. For example, if there are three racks, then apply the algorithm three times, each time with broker list and assignment for that specific rack. Again, we assume the broker to rack mapping will be available in the method signature.

          The open question is how to obtain broker to rack mapping. The information can be supplied when Kafka registers the broker with ZooKeeper which means some information has to be added to ZooKeeper. However, it could be that the rack information is already available in a deployment independent way. For example, for some deployment, the rack information may be available in a database. What we can do is to abstract out the API required to obtain rack information in an interface and allow user to supply an implementation in command line or at broker start up (to handle auto topic creation).

          Show
          allenxwang Allen Wang added a comment - We have a working solution now for rack aware assignment. It is based on current patch for this JIRA but with some improvement. The key idea of the solution is: Rack ID is a String instead of integer For replica assignment, add an extra parameter of Map [Int, String] to assignReplicasToBrokers() method which maps broker ID to rack ID Before doing the rack aware assignment, sort the broker list such that they are interlaced according to the rack. In other words, adjacent brokers should not be in the same rack if possible . For example, assuming 6 brokers mapping to 3 racks: 0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 -> "rack2", 4 -> "rack3", 5 -> "rack3" The sorted broker list could be (0, 2, 4, 1, 3, 5) Apply the same assignment algorithm to assign replicas, with the addition of skipping a broker if its rack is already used for the same partition (similar to what has been done in current patch) The benefit of this approach is that replica distribution is kept as even as possible to all the racks and brokers. With regard to KAFKA-1792 , an easy solution is to restrict replica movement within the same rack, which I think should work in most practical cases. It will also have added benefit that usually replicas move faster within a rack. So basically we can apply the same algorithm described in KAFKA-1792 for each rack. For example, if there are three racks, then apply the algorithm three times, each time with broker list and assignment for that specific rack. Again, we assume the broker to rack mapping will be available in the method signature. The open question is how to obtain broker to rack mapping. The information can be supplied when Kafka registers the broker with ZooKeeper which means some information has to be added to ZooKeeper. However, it could be that the rack information is already available in a deployment independent way. For example, for some deployment, the rack information may be available in a database. What we can do is to abstract out the API required to obtain rack information in an interface and allow user to supply an implementation in command line or at broker start up (to handle auto topic creation).
          Hide
          nehanarkhede Neha Narkhede added a comment -

          Allen Wang This was inactive for a while, but I think it will be good to wait until KAFKA-1792 is done to propose a solution for rack-awareness.

          Show
          nehanarkhede Neha Narkhede added a comment - Allen Wang This was inactive for a while, but I think it will be good to wait until KAFKA-1792 is done to propose a solution for rack-awareness.
          Hide
          allenxwang Allen Wang added a comment -

          What's the status of this JIRA?

          I have two questions:

          • Can we simply use string for rack ID? This will make it much easier to use in AWS where the zone ID is a string. Otherwise there will be unnecessary code to convert them back and forth.
          • Why is max-rack-replication necessary? In most use cases you want to have even distribution of replicas to racks without having to consider max replication per rack.
          Show
          allenxwang Allen Wang added a comment - What's the status of this JIRA? I have two questions: Can we simply use string for rack ID? This will make it much easier to use in AWS where the zone ID is a string. Otherwise there will be unnecessary code to convert them back and forth. Why is max-rack-replication necessary? In most use cases you want to have even distribution of replicas to racks without having to consider max replication per rack.
          Hide
          guozhang Guozhang Wang added a comment -

          Moving out of 0.8.2 for now..

          Show
          guozhang Guozhang Wang added a comment - Moving out of 0.8.2 for now..
          Hide
          jvanremoortere Joris Van Remoortere added a comment -

          Joel Koshy I am out till the end of the month. I was going to take a deeper look at how to integrate this into the auto-rebalance as that has since been released (post initial patch).

          Show
          jvanremoortere Joris Van Remoortere added a comment - Joel Koshy I am out till the end of the month. I was going to take a deeper look at how to integrate this into the auto-rebalance as that has since been released (post initial patch).
          Hide
          jjkoshy Joel Koshy added a comment -

          Joris Van Remoortere did you get a chance to follow-up on the review comments in rb 17248?

          Show
          jjkoshy Joel Koshy added a comment - Joris Van Remoortere did you get a chance to follow-up on the review comments in rb 17248?
          Hide
          jorgeo Jorge Ortiz added a comment -

          Any update on this? We're deploying Kafka on AWS and rack-awareness would be lovely.

          Show
          jorgeo Jorge Ortiz added a comment - Any update on this? We're deploying Kafka on AWS and rack-awareness would be lovely.
          Hide
          jvanremoortere Joris Van Remoortere added a comment -

          Since there is further interest in this #1357 I will try to look at this soon.

          Show
          jvanremoortere Joris Van Remoortere added a comment - Since there is further interest in this #1357 I will try to look at this soon.
          Hide
          guozhang Guozhang Wang added a comment -

          Some more comments on the RB.

          Show
          guozhang Guozhang Wang added a comment - Some more comments on the RB.
          Hide
          junrao Jun Rao added a comment -

          Sorry for not getting to this earlier, commented on the RB.

          Show
          junrao Jun Rao added a comment - Sorry for not getting to this earlier, commented on the RB.
          Hide
          jvanremoortere Joris Van Remoortere added a comment -

          Jun Rao could you please look at this? Review Request #17248

          Thanks!

          Show
          jvanremoortere Joris Van Remoortere added a comment - Jun Rao could you please look at this? Review Request #17248 Thanks!
          Hide
          jvanremoortere Joris Van Remoortere added a comment -

          Rebased to trunk:

          • Made rack-id optional, default to -1
          • Carry max-replication through zookeeper in order to support add-partitions

          Missing:

          • There is currently no warning when changing a broker's rack-id
          • There is no warning during manual replica (re)assignment if the assignment is not legal under the given max-rack-replication factor
          Show
          jvanremoortere Joris Van Remoortere added a comment - Rebased to trunk: Made rack-id optional, default to -1 Carry max-replication through zookeeper in order to support add-partitions Missing: There is currently no warning when changing a broker's rack-id There is no warning during manual replica (re)assignment if the assignment is not legal under the given max-rack-replication factor
          Hide
          junrao Jun Rao added a comment -

          Thanks for the patch. Looks good over all. Some comments.

          1. KafkaConfig:
          1.1 We need a config for default max-rack-replication for auto topic creation.
          1.2 rackId: We probably don't want to make this a required property. So, perhaps we can default it to 0?

          2. AdminUtils.assignReplicasToBrokers():
          2.1 Could you add some comments on the rack-aware assignment algorithm?
          2.2 It's a bit weird for this method to take zkclient in the input. We probably can pass in a list of Broker objects instead.

          3. Unit tests: I suggest that we leave most existing tests intact by keeping the rackId default and add a new test for rack-aware assignment.

          4. Compatibility test: It seems that the changes for the broker format in ZK is backward compatible. Could you double check? For example, an old reader (controller, consumer, etc) should be able to parse the broker registered in new format and a new reader should be able to parse the broker registered in the old format. Also, we probably should increase the version in the ZK registration for the broker.

          5. Could you rebase to trunk?

          Show
          junrao Jun Rao added a comment - Thanks for the patch. Looks good over all. Some comments. 1. KafkaConfig: 1.1 We need a config for default max-rack-replication for auto topic creation. 1.2 rackId: We probably don't want to make this a required property. So, perhaps we can default it to 0? 2. AdminUtils.assignReplicasToBrokers(): 2.1 Could you add some comments on the rack-aware assignment algorithm? 2.2 It's a bit weird for this method to take zkclient in the input. We probably can pass in a list of Broker objects instead. 3. Unit tests: I suggest that we leave most existing tests intact by keeping the rackId default and add a new test for rack-aware assignment. 4. Compatibility test: It seems that the changes for the broker format in ZK is backward compatible. Could you double check? For example, an old reader (controller, consumer, etc) should be able to parse the broker registered in new format and a new reader should be able to parse the broker registered in the old format. Also, we probably should increase the version in the ZK registration for the broker. 5. Could you rebase to trunk?

            People

            • Assignee:
              allenxwang Allen Wang
              Reporter:
              jvanremoortere Joris Van Remoortere
              Reviewer:
              Jun Rao
            • Votes:
              12 Vote for this issue
              Watchers:
              33 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development