Kafka
  1. Kafka
  2. KAFKA-554

Move all per-topic configuration into ZK and add to the CreateTopicCommand

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.1
    • Component/s: None
    • Labels:

      Description

      We have a number of per-topic configurations that control message retention and flush interval. Here is the list of properties I find in KafkaConfig that appear to be per-topic:
      topic.log.file.size
      topic.log.roll.hours
      topic.log.retention.hours
      topic.log.retention.size
      topic.flush.intervals.ms
      Currently we specify these in server.properties. This is not a good solution as it requires a rolling bounce of the cluster to make a change, which just doesn't scale to having hundreds of topics. Also the map encoded in a CSV string is kind of hacky.

      We should move these into ZK in some kind of JSON blob that allows easily adding new per-topic configs and we should remove these from server.properties.

      It would be good to start with a wiki design and get consensus on that first.

      1. KAFKA-554-v4.patch
        127 kB
        Jay Kreps
      2. KAFKA-554-v3.patch
        124 kB
        Jay Kreps
      3. KAFKA-554-v2.patch
        124 kB
        Jay Kreps
      4. KAFKA-554-v1.patch
        134 kB
        Jay Kreps

        Activity

        Jay Kreps created issue -
        Jay Kreps made changes -
        Field Original Value New Value
        Issue Type Bug [ 1 ] New Feature [ 2 ]
        Jay Kreps made changes -
        Description We have a number of per-topic configurations that control message retention and flush interval. Here is the list of properties I find in KafkaConfig that appear to be per-topic:
          topic.log.file.size
          topic.log.roll.hours
          topic.log.retention.hours
          topic.log.retention.size
          topic.flush.intervals.ms
        Currently we specify these in server.properties. This is not a good solution as it requires a rolling bounce of the cluster to make a change, which just doesn't scale to having hundreds of topics. Also the map encoded in a CSV string is kind of hacky.

        We should move these into ZK in some kind of JSON blob that allows easily adding new per-topic configs and we should remove these from server.properties.
        We have a number of per-topic configurations that control message retention and flush interval. Here is the list of properties I find in KafkaConfig that appear to be per-topic:
          topic.log.file.size
          topic.log.roll.hours
          topic.log.retention.hours
          topic.log.retention.size
          topic.flush.intervals.ms
        Currently we specify these in server.properties. This is not a good solution as it requires a rolling bounce of the cluster to make a change, which just doesn't scale to having hundreds of topics. Also the map encoded in a CSV string is kind of hacky.

        We should move these into ZK in some kind of JSON blob that allows easily adding new per-topic configs and we should remove these from server.properties.

        It would be good to start with a wiki design and get consensus on that first.
        Jay Kreps made changes -
        Labels project
        Hide
        Jay Kreps added a comment -

        This patch does two things:
        1. Implement a dynamic configuration mechanism for topics
        2. Remove the scripts bin/kafka-list-topic.sh, bin/kafka-delete-topic.sh, bin/kafka-create-topic.sh and create a new more powerful tool:
        jay@ahab:kafka> bin/kafka-topics.sh
        Command must include exactly one action: --list, --describe, --create, --delete, or --alter
        Option Description
        ------ -----------
        --alter Alter the configuration for the topic.
        --config <name=value> A topic configuration for this topic.
        --create Create a new topic.
        --delete Delete the topic.
        --describe List details for the given topics.
        --help Print usage information.
        --list List all available topics.
        --partitions <Integer: # of partitions> The number of partitions for the topic.
        --replica-assignment A list of manual partition-to-broker
        <broker_id_for_part1_replica1 : assignments.
        broker_id_for_part1_replica2 ,
        broker_id_for_part2_replica1 :
        broker_id_for_part2_replica2 , ...>
        --replication-factor <Integer: The replication factor for each
        replication factor> partition in the topic.
        --topic <topic> The topic to be created.
        --zookeeper <urls> REQUIRED: The connection string for
        the zookeeper connection in the form
        host:port. Multiple URLS can be
        given to allow fail-over.

        This command line tool can either list topics, describe topics, create topics, delete topics, or change the configuration for topics.

        Here is an example of creating two topics with overrides:
        ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic first_topic --topic second_topic --replication-factor 1 --partitions 4 --config segment.bytes=1073741824 --config retention.ms=1000000
        Created topic "first_topic".
        Created topic "second_topic".

        (Any command that takes a topic option can run on a list of topics by giving more than one topic flag.)

        ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
        first_topic
        second_topic

        ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic second_topic
        second_topic
        configs: segment.bytes = 1073741824, retention.ms = 1000000
        partitions: 4
        partition 0
        leader: 0 (ahab.linkedin.biz:9092)
        replicas: 0 (ahab.linkedin.biz:9092)
        isr: 0 (ahab.linkedin.biz:9092)
        partition 1
        leader: 0 (ahab.linkedin.biz:9092)
        replicas: 0 (ahab.linkedin.biz:9092)
        isr: 0 (ahab.linkedin.biz:9092)
        partition 2
        leader: 0 (ahab.linkedin.biz:9092)
        replicas: 0 (ahab.linkedin.biz:9092)
        isr: 0 (ahab.linkedin.biz:9092)
        partition 3
        leader: 0 (ahab.linkedin.biz:9092)
        replicas: 0 (ahab.linkedin.biz:9092)
        isr: 0 (ahab.linkedin.biz:9092)

        This configuration could be changed later for a topic by running
        ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic first_topic --config segment.bytes=673741824 --config retention.ms=500000
        Updated config for topic "first_topic".

        The implementation of the dynamic config is to add a new zookeeper path
        /config
        This path has two subdirectories
        /config/topics/<topic_name>
        and
        /config/changes
        The per-topic path contains any override properties specified for the topic stored in java.util.Properties format. If no overrides are given then that znode will not exist. The defaults are still taken from the server.properties file.

        The /config/changes path is used to reduce the number of watches required. Instead of keeping a watch on each config override znode, whenever we update a config entry we add a sequential entry under the changes directory containing the name of the topic whose config changed. Each broker keeps a watch on this directory and caches the last change it has executed. When the watch fires it executes any new config changes. Old change entries are garbage collected after 10 minutes. The config changes are managed by a new class TopicConfigManager which executes these changes.

        This patch also has two refactorings:
        1. Renamed KafkaZookeeper to KafkaHealthcheck
        2. Moved logic for creating topics out of CreateTopicCommand and replaced it with two utilities in AdminUtils:
        def createTopic(zkClient: ZkClient,
        topic: String,
        partitions: Int,
        replicationFactor: Int,
        topicConfig: Properties = new Properties)
        def createTopicWithAssignment(zkClient: ZkClient,
        topic: String,
        partitionReplicaAssignment: Map[Int, Seq[Int]],
        config: Properties = new Properties)
        The first method will choose a partition assignment, and the second just sanity checks the assignment it is given.

        I had originally planned to implement an RPC api to create and delete and alter topics, but I backed away from this since we don't seem to have a sane way to organize admin functionality yet.

        I think the first step in cleaning up is probably to refactor AdminUtils into a sane Admin client with methods that match the high-level administrative operations. This will still directly interact with zookeeper. This would be a reasonable starting point since one could at least then implement a web console that used this class even if the functionality was not available to other languages. But in any case this is beyond the scope of this patch.

        Show
        Jay Kreps added a comment - This patch does two things: 1. Implement a dynamic configuration mechanism for topics 2. Remove the scripts bin/kafka-list-topic.sh, bin/kafka-delete-topic.sh, bin/kafka-create-topic.sh and create a new more powerful tool: jay@ahab:kafka> bin/kafka-topics.sh Command must include exactly one action: --list, --describe, --create, --delete, or --alter Option Description ------ ----------- --alter Alter the configuration for the topic. --config <name=value> A topic configuration for this topic. --create Create a new topic. --delete Delete the topic. --describe List details for the given topics. --help Print usage information. --list List all available topics. --partitions <Integer: # of partitions> The number of partitions for the topic. --replica-assignment A list of manual partition-to-broker <broker_id_for_part1_replica1 : assignments. broker_id_for_part1_replica2 , broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each replication factor> partition in the topic. --topic <topic> The topic to be created. --zookeeper <urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. This command line tool can either list topics, describe topics, create topics, delete topics, or change the configuration for topics. Here is an example of creating two topics with overrides: ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic first_topic --topic second_topic --replication-factor 1 --partitions 4 --config segment.bytes=1073741824 --config retention.ms=1000000 Created topic "first_topic". Created topic "second_topic". (Any command that takes a topic option can run on a list of topics by giving more than one topic flag.) ./bin/kafka-topics.sh --zookeeper localhost:2181 --list first_topic second_topic ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic second_topic second_topic configs: segment.bytes = 1073741824, retention.ms = 1000000 partitions: 4 partition 0 leader: 0 (ahab.linkedin.biz:9092) replicas: 0 (ahab.linkedin.biz:9092) isr: 0 (ahab.linkedin.biz:9092) partition 1 leader: 0 (ahab.linkedin.biz:9092) replicas: 0 (ahab.linkedin.biz:9092) isr: 0 (ahab.linkedin.biz:9092) partition 2 leader: 0 (ahab.linkedin.biz:9092) replicas: 0 (ahab.linkedin.biz:9092) isr: 0 (ahab.linkedin.biz:9092) partition 3 leader: 0 (ahab.linkedin.biz:9092) replicas: 0 (ahab.linkedin.biz:9092) isr: 0 (ahab.linkedin.biz:9092) This configuration could be changed later for a topic by running ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic first_topic --config segment.bytes=673741824 --config retention.ms=500000 Updated config for topic "first_topic". The implementation of the dynamic config is to add a new zookeeper path /config This path has two subdirectories /config/topics/<topic_name> and /config/changes The per-topic path contains any override properties specified for the topic stored in java.util.Properties format. If no overrides are given then that znode will not exist. The defaults are still taken from the server.properties file. The /config/changes path is used to reduce the number of watches required. Instead of keeping a watch on each config override znode, whenever we update a config entry we add a sequential entry under the changes directory containing the name of the topic whose config changed. Each broker keeps a watch on this directory and caches the last change it has executed. When the watch fires it executes any new config changes. Old change entries are garbage collected after 10 minutes. The config changes are managed by a new class TopicConfigManager which executes these changes. This patch also has two refactorings: 1. Renamed KafkaZookeeper to KafkaHealthcheck 2. Moved logic for creating topics out of CreateTopicCommand and replaced it with two utilities in AdminUtils: def createTopic(zkClient: ZkClient, topic: String, partitions: Int, replicationFactor: Int, topicConfig: Properties = new Properties) def createTopicWithAssignment(zkClient: ZkClient, topic: String, partitionReplicaAssignment: Map[Int, Seq [Int] ], config: Properties = new Properties) The first method will choose a partition assignment, and the second just sanity checks the assignment it is given. I had originally planned to implement an RPC api to create and delete and alter topics, but I backed away from this since we don't seem to have a sane way to organize admin functionality yet. I think the first step in cleaning up is probably to refactor AdminUtils into a sane Admin client with methods that match the high-level administrative operations. This will still directly interact with zookeeper. This would be a reasonable starting point since one could at least then implement a web console that used this class even if the functionality was not available to other languages. But in any case this is beyond the scope of this patch.
        Jay Kreps made changes -
        Attachment KAFKA-554-v1.patch [ 12568654 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v1. Looks good overall. Some comments:

        1. TopicCommand: Some options are only available for certain actions. Should we explain that in the description of those options?

        2. AdminUtils:
        2.1 comment: "is there is any" => "if there is any"
        2.2 We have standardized non-singleton values in ZK to JSON. Should the values stored in the topic config path be JSON too?

        3. TopicConfigManager:
        3.1 add the missing > in the following comment.

        • /brokers/topics/<topic_name/config
          3.2 startup(): It seems there is no need to make sure TopicConfigChangesPath exists here since that's covered in initZk() in KafkaServer.startup().
          3.3 processConfigChanges():
          3.3.1 How about using "processing config change notifications" in the following logging to make it more specific.
          info("Processing %d change notifications...".format(notifications.size))
          3.3.2 Reading the config from ZK can be done only if changeId > lastExecutedChange
          3.3.3 Not sure why we don't delete the sequential node corresponding to lastChangeId from ZK.
          3.3.4 It seems that sequential nodes under /brokers/config_changes are only deleted when there is a new config change. So, they are not always deleted after the configured expiration time.
          3.4. ConfigChangeListener.handleChildChange(): chillins are obtained from zk.getChildren(). There is no guarantee that the list is sorted. So, you will need to sort it yourself since ordering is important here.

        4. KafkaApis.handleOffsetCommitRequest(): The following statements
        val responseInfo = offsetCommitRequest.requestInfo.map( t => {
        val (topicAndPartition, metadataAndError) = t
        can be simplified to
        val responseInfo = offsetCommitRequest.requestInfo.map( case (topicAndPartition, metadataAndError) => {

        5. LogConfig: Can we define the two retentionPolicies "delete" and "dedupe" as contants and reuse them in LogConfig and KafkaConfig?

        6. RequestKeys: ModifyTopicKey is not used.

        7. ZkUtils: remove the following unused imports
        import java.util.Properties
        import java.io.

        {StringReader, StringWriter}

        8. PrimitiveApiTest: Instead of commenting out the following lines, should we just remove them?
        // temporarily set request handler logger to a higher level
        //requestHandlerLogger.setLevel(Level.FATAL)

        9. ReplicaFetchTest.logsMatch(): tandp is a bit confusing. Could we rename it to topicAndPart?

        The patch needs to be rebased. Some of the changes are no longer necessary after the patch that standardizes the ZK paths/values.

        Show
        Jun Rao added a comment - Thanks for patch v1. Looks good overall. Some comments: 1. TopicCommand: Some options are only available for certain actions. Should we explain that in the description of those options? 2. AdminUtils: 2.1 comment: "is there is any" => "if there is any" 2.2 We have standardized non-singleton values in ZK to JSON. Should the values stored in the topic config path be JSON too? 3. TopicConfigManager: 3.1 add the missing > in the following comment. /brokers/topics/<topic_name/config 3.2 startup(): It seems there is no need to make sure TopicConfigChangesPath exists here since that's covered in initZk() in KafkaServer.startup(). 3.3 processConfigChanges(): 3.3.1 How about using "processing config change notifications" in the following logging to make it more specific. info("Processing %d change notifications...".format(notifications.size)) 3.3.2 Reading the config from ZK can be done only if changeId > lastExecutedChange 3.3.3 Not sure why we don't delete the sequential node corresponding to lastChangeId from ZK. 3.3.4 It seems that sequential nodes under /brokers/config_changes are only deleted when there is a new config change. So, they are not always deleted after the configured expiration time. 3.4. ConfigChangeListener.handleChildChange(): chillins are obtained from zk.getChildren(). There is no guarantee that the list is sorted. So, you will need to sort it yourself since ordering is important here. 4. KafkaApis.handleOffsetCommitRequest(): The following statements val responseInfo = offsetCommitRequest.requestInfo.map( t => { val (topicAndPartition, metadataAndError) = t can be simplified to val responseInfo = offsetCommitRequest.requestInfo.map( case (topicAndPartition, metadataAndError) => { 5. LogConfig: Can we define the two retentionPolicies "delete" and "dedupe" as contants and reuse them in LogConfig and KafkaConfig? 6. RequestKeys: ModifyTopicKey is not used. 7. ZkUtils: remove the following unused imports import java.util.Properties import java.io. {StringReader, StringWriter} 8. PrimitiveApiTest: Instead of commenting out the following lines, should we just remove them? // temporarily set request handler logger to a higher level //requestHandlerLogger.setLevel(Level.FATAL) 9. ReplicaFetchTest.logsMatch(): tandp is a bit confusing. Could we rename it to topicAndPart? The patch needs to be rebased. Some of the changes are no longer necessary after the patch that standardizes the ZK paths/values.
        Hide
        Jay Kreps added a comment -

        Yeah I thought about json vs properties. The advantage i am getting out of properties is the ability to have non-stored defaults. Properties have the concept of defaults which are used but not written out. I could probably replicate that with some wrapper class but that might make the api a bit weird. The other option would be to use Properties in the api but just serialize it as JSON. That is the option I am leaning towards.

        Show
        Jay Kreps added a comment - Yeah I thought about json vs properties. The advantage i am getting out of properties is the ability to have non-stored defaults. Properties have the concept of defaults which are used but not written out. I could probably replicate that with some wrapper class but that might make the api a bit weird. The other option would be to use Properties in the api but just serialize it as JSON. That is the option I am leaning towards.
        Hide
        Neha Narkhede added a comment -

        I'm a little late to this review and seems like trunk has moved a lot. Will probably just wait for the rebased version.

        Show
        Neha Narkhede added a comment - I'm a little late to this review and seems like trunk has moved a lot. Will probably just wait for the rebased version.
        Hide
        Jay Kreps added a comment -

        New patch v2. Addresses Jun's comments and rebases.

        1. TopicCommand: Made it a bit more clear which options apply to which actions.

        2. AdminUtils:
        2.1 Fixed typo
        2.2 Moved serialization to JSON. The APIs still take Properties instances as that is a higherarchical map. However the data stored in ZK is now JSON. One question is what the format of the change notifications should be. I added a proposal to the kafka zk wiki, please take a look.

        3. TopicConfigManager:
        3.1 add the missing > in the following comment.

        • /brokers/topics/<topic_name/config
          3.2 Yeah but this component should stand alone ideally.
          3.3.1 Done
          3.3.2 Good call, added that optimization.
          3.3.3 I wasn't sure if deleting all znodes would lead to the sequence starting over. I am told it won't so removed this.
          3.3.4 "It seems that sequential nodes under /brokers/config_changes are only deleted when there is a new config change. So, they are not always deleted after the configured expiration time." Yes. My rational is that I want to clean up just to prevent notifications from piling up forever. But if no new notifications come in then they can't keep piling up. I could also do this by scheduling the deletion but the problem with that is knowing whether you have already scheduled it (you don't want to schedule it over and over). I could also add a background thread dedicated to cleanup, but it doesn't seem worth it for the added complexity.
          3.4. Nice call, sorted it
          4. Hmm, that doesn't actually compile for me.
          5. Not sure that helps. Config is a contract so we can't change these values in the future so just having the value is more straight-forward (one fewer layers of indirection)
          6. Removed ModifyTopicKey
          7. ZkUtils: removed imports
          8. Yeah, that slipped in. Removed.
          9. ReplicaFetchTest.logsMatch(): renamed tandp it to topicAndPart?
        Show
        Jay Kreps added a comment - New patch v2. Addresses Jun's comments and rebases. 1. TopicCommand: Made it a bit more clear which options apply to which actions. 2. AdminUtils: 2.1 Fixed typo 2.2 Moved serialization to JSON. The APIs still take Properties instances as that is a higherarchical map. However the data stored in ZK is now JSON. One question is what the format of the change notifications should be. I added a proposal to the kafka zk wiki, please take a look. 3. TopicConfigManager: 3.1 add the missing > in the following comment. /brokers/topics/<topic_name/config 3.2 Yeah but this component should stand alone ideally. 3.3.1 Done 3.3.2 Good call, added that optimization. 3.3.3 I wasn't sure if deleting all znodes would lead to the sequence starting over. I am told it won't so removed this. 3.3.4 "It seems that sequential nodes under /brokers/config_changes are only deleted when there is a new config change. So, they are not always deleted after the configured expiration time." Yes. My rational is that I want to clean up just to prevent notifications from piling up forever. But if no new notifications come in then they can't keep piling up. I could also do this by scheduling the deletion but the problem with that is knowing whether you have already scheduled it (you don't want to schedule it over and over). I could also add a background thread dedicated to cleanup, but it doesn't seem worth it for the added complexity. 3.4. Nice call, sorted it 4. Hmm, that doesn't actually compile for me. 5. Not sure that helps. Config is a contract so we can't change these values in the future so just having the value is more straight-forward (one fewer layers of indirection) 6. Removed ModifyTopicKey 7. ZkUtils: removed imports 8. Yeah, that slipped in. Removed. 9. ReplicaFetchTest.logsMatch(): renamed tandp it to topicAndPart?
        Jay Kreps made changes -
        Attachment KAFKA-554-v2.patch [ 12572616 ]
        Hide
        Jay Kreps added a comment -

        Erp, that last patch was screwed up. This one should be right.

        Show
        Jay Kreps added a comment - Erp, that last patch was screwed up. This one should be right.
        Jay Kreps made changes -
        Attachment KAFKA-554-v3.patch [ 12572653 ]
        Hide
        Jun Rao added a comment -

        For 4, you have to do the following: Instead of using ( for map, you have to use {.
        val responseInfo = offsetCommitRequest.requestInfo.map{ case (topicAndPartition, metadataAndError) =>

        { ... }
        Show
        Jun Rao added a comment - For 4, you have to do the following: Instead of using ( for map, you have to use {. val responseInfo = offsetCommitRequest.requestInfo.map{ case (topicAndPartition, metadataAndError) => { ... }
        Hide
        Neha Narkhede added a comment -

        Json.scala

        • It seems like the map and sequence case inserts an extra "," for the last element as well. This will cause invalid json
          KafkaServer
        • Does it make sense to start the health check at the end after the request handlers and replica manager has started up ?
        • In shutdown(), you are closing zkclient before controller, replica manager, log manager which use zkclient. I think we should close it at the very end
        Show
        Neha Narkhede added a comment - Json.scala It seems like the map and sequence case inserts an extra "," for the last element as well. This will cause invalid json KafkaServer Does it make sense to start the health check at the end after the request handlers and replica manager has started up ? In shutdown(), you are closing zkclient before controller, replica manager, log manager which use zkclient. I think we should close it at the very end
        Hide
        Jay Kreps added a comment -

        V4 addresses Neha and Jun's comments.

        Jun, nice. Made that change.

        Neha, I think the sequence case is okay (.mkString() does the right thing). Added a unit test to cover that and other cases. Wrt the two ordering things, yes you are right, switched.

        Show
        Jay Kreps added a comment - V4 addresses Neha and Jun's comments. Jun, nice. Made that change. Neha, I think the sequence case is okay (.mkString() does the right thing). Added a unit test to cover that and other cases. Wrt the two ordering things, yes you are right, switched.
        Jay Kreps made changes -
        Attachment KAFKA-554-v4.patch [ 12572850 ]
        Hide
        Neha Narkhede added a comment -

        +1 on patch v4

        Show
        Neha Narkhede added a comment - +1 on patch v4
        Hide
        Jay Kreps added a comment -

        Cool, checked in. Jun let me know if you have any further comments and I will follow-up.

        Show
        Jay Kreps added a comment - Cool, checked in. Jun let me know if you have any further comments and I will follow-up.
        Jay Kreps made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Assignee Jay Kreps [ jkreps ]
        Resolution Fixed [ 1 ]
        Hide
        Jun Rao added a comment -

        For debugging purpose, is it useful to be able to get the runtime config values used by each broker (e.g., through jmx)?

        Show
        Jun Rao added a comment - For debugging purpose, is it useful to be able to get the runtime config values used by each broker (e.g., through jmx)?

          People

          • Assignee:
            Jay Kreps
            Reporter:
            Jay Kreps
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development