Kafka
  1. Kafka
  2. KAFKA-755

standardizing json values stored in ZK

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:

      Description

      Currently, we have the following paths in ZK that stores non-singleton values.

      1. Topic assignment value:
      /brokers/topics/topic

      { "0": ["0"] }

      2. LeaderAndISR info:
      /brokers/topics/test/partitions/0/leaderAndISR

      { "ISR":"0,1","leader":"0","controllerEpoch":"1","leaderEpoch":"0" }

      3. broker registration:
      /brokers/ids/0
      192.168.1.148:9092:9999

      4. partition reassignment path

      It would be good if we do the following:
      a. make them true json (e.g., using number as the value for broker/partition, instead of string).
      b. add version support for future growth.

      1. kafka-755-v1.patch
        47 kB
        Swapnil Ghike
      2. kafka-755-v2.patch
        47 kB
        Swapnil Ghike
      3. kafka-755-v3.patch
        48 kB
        Swapnil Ghike

        Activity

        Hide
        Jun Rao added a comment -

        Thanks for patch v3. +1 and committed to 0.8 branch.

        Show
        Jun Rao added a comment - Thanks for patch v3. +1 and committed to 0.8 branch.
        Hide
        Swapnil Ghike added a comment -

        Made the changes and rebased.

        writePreferredReplicaElectionData is a bit clunky to read currently, I will take care of it while completing the remaining changes for items 9 adn 10.

        Show
        Swapnil Ghike added a comment - Made the changes and rebased. writePreferredReplicaElectionData is a bit clunky to read currently, I will take care of it while completing the remaining changes for items 9 adn 10.
        Hide
        Jun Rao added a comment -

        Thanks for patch v2. A few more comments.

        20. Broker: Can we add brokerInfoString in the following exception?
        throw new KafkaException("Failed to parse the broker info from zookeeper", t)

        21. TopicCount:
        21.1 Can we define "white_list" and "black_list" as constants and reference only the constants instead of the strings?
        21.2 Can we rename specialList to subscriptionPattern?

        22. Utils: The comment for mapWithSeqValuesToJson() needs to be changed.

        Could you rebase too?

        Show
        Jun Rao added a comment - Thanks for patch v2. A few more comments. 20. Broker: Can we add brokerInfoString in the following exception? throw new KafkaException("Failed to parse the broker info from zookeeper", t) 21. TopicCount: 21.1 Can we define "white_list" and "black_list" as constants and reference only the constants instead of the strings? 21.2 Can we rename specialList to subscriptionPattern? 22. Utils: The comment for mapWithSeqValuesToJson() needs to be changed. Could you rebase too?
        Hide
        Swapnil Ghike added a comment -

        Addressed all points.

        i. Renamed valueAsString flag to valueInQuotes for clarity.
        ii. Json fields should appear in sorted order now.
        iii. Changed the whitelist/blacklist detection to use the "pattern" field in the zk json. Accordingly modified the createTopicCount and dbString methods.

        Testing done -
        1. Testing done for v1 patch.
        2. Tried all of non-wildcard, whitelist and blacklist topics with console consumer. Zk data looks ok (it does not include * for non-wildcard topics). Also changed the wiki to reflect this, previously it had entries like "*abc" and "!abc" for wildcard consumer subscription fields.

        Show
        Swapnil Ghike added a comment - Addressed all points. i. Renamed valueAsString flag to valueInQuotes for clarity. ii. Json fields should appear in sorted order now. iii. Changed the whitelist/blacklist detection to use the "pattern" field in the zk json. Accordingly modified the createTopicCount and dbString methods. Testing done - 1. Testing done for v1 patch. 2. Tried all of non-wildcard, whitelist and blacklist topics with console consumer. Zk data looks ok (it does not include * for non-wildcard topics). Also changed the wiki to reflect this, previously it had entries like "*abc" and "!abc" for wildcard consumer subscription fields.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Some comments.

        1. Broker.createBroker(): We should catch all json parsing related exceptions and throw a KafkaException with the problematic json string in it.

        2. ZkUtils.partitionReplicasZkData: should we rename it to replicaAssignmentZkData?

        3. TopicCount: Instead of throwing RuntimeException when hitting json errors, it's probably better to throw KafkaException.

        4. Utils:
        4.1 I recommend rename putStringValuesInJsonObject() to mapToJson().
        4.2 I recommend rename putStringSeqInJsonArray() to arrayToJson() and add a valueAsString flag too.
        4.3 recommend rename putIntSeqValuesInJson() to mapWithSeqValuesToJson()

        5. ZkUtils:
        5.1 registerBrokerInZK(): broker is no longer referenced
        5.2 There are 2 getReplicaAssignmentForTopics methods and the following one is not used. Let's remove it.
        def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]):

        6. ZookeeperConsumerConnector: the line calling mergeJsonObjects is too long

        7.ZookeeperConsumerConnectorTest: no real change

        8. The following map is not sorted by field names.
        /consumers/console-consumer-8915/ids/console-consumer-8915_jrao-ld-1361498895428-3268a767
        { "subscription":

        { "*test" : 1 }

        , "version":1, "pattern":"white_list" }

        Show
        Jun Rao added a comment - Thanks for the patch. Some comments. 1. Broker.createBroker(): We should catch all json parsing related exceptions and throw a KafkaException with the problematic json string in it. 2. ZkUtils.partitionReplicasZkData: should we rename it to replicaAssignmentZkData? 3. TopicCount: Instead of throwing RuntimeException when hitting json errors, it's probably better to throw KafkaException. 4. Utils: 4.1 I recommend rename putStringValuesInJsonObject() to mapToJson(). 4.2 I recommend rename putStringSeqInJsonArray() to arrayToJson() and add a valueAsString flag too. 4.3 recommend rename putIntSeqValuesInJson() to mapWithSeqValuesToJson() 5. ZkUtils: 5.1 registerBrokerInZK(): broker is no longer referenced 5.2 There are 2 getReplicaAssignmentForTopics methods and the following one is not used. Let's remove it. def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Iterator [String] ): 6. ZookeeperConsumerConnector: the line calling mergeJsonObjects is too long 7.ZookeeperConsumerConnectorTest: no real change 8. The following map is not sorted by field names. /consumers/console-consumer-8915/ids/console-consumer-8915_jrao-ld-1361498895428-3268a767 { "subscription": { "*test" : 1 } , "version":1, "pattern":"white_list" }
        Hide
        Swapnil Ghike added a comment -

        This patch standardizes the reading and writing to zookeeper values according to the JSON schemas 1 thru 8 defined in the aforementioned wiki.

        The changes in this patch involve a bunch of refactoring to make this standardization possible.

        This patch does not touch any other use of JSON-like structures in the code that does not involve interacting with zookeeper.

        Testing done:

        • Unit tests pass.
        • Nuke all the zookeeper data. Start kafka server, produce data using a producer and start a consumer. Observe the zookeeper data.

        It would be good if we could check in this 1st patch and let the system test run over it. I will create another patch to make the changes for item 9 and 10 in the wiki as and when my hands are free of other blockers. 9 and 10 will be used only by our tools and are standalone changes in the sense that we can simply delete and recreate the related zookeeper namespaces without disturbing our clients.

        Show
        Swapnil Ghike added a comment - This patch standardizes the reading and writing to zookeeper values according to the JSON schemas 1 thru 8 defined in the aforementioned wiki. The changes in this patch involve a bunch of refactoring to make this standardization possible. This patch does not touch any other use of JSON-like structures in the code that does not involve interacting with zookeeper. Testing done: Unit tests pass. Nuke all the zookeeper data. Start kafka server, produce data using a producer and start a consumer. Observe the zookeeper data. It would be good if we could check in this 1st patch and let the system test run over it. I will create another patch to make the changes for item 9 and 10 in the wiki as and when my hands are free of other blockers. 9 and 10 will be used only by our tools and are standalone changes in the sense that we can simply delete and recreate the related zookeeper namespaces without disturbing our clients.
        Hide
        Neha Narkhede added a comment -

        I updated the wiki with the proposed zookeeper path and format changes for preferred replica election and partition reassignment

        Show
        Neha Narkhede added a comment - I updated the wiki with the proposed zookeeper path and format changes for preferred replica election and partition reassignment
        Hide
        Jun Rao added a comment -

        Updated the wiki. We need to patch value for the following path too.

        Consumer registration:

        /consumers/[groupId]/ids/[consumerId]

        Show
        Jun Rao added a comment - Updated the wiki. We need to patch value for the following path too. Consumer registration: /consumers/ [groupId] /ids/ [consumerId]
        Show
        Jun Rao added a comment - Proposed the new format in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
        Hide
        Jay Kreps added a comment -

        Also
        c. Think through how any generalization would work (e.g. (1) can't really be extended beyond partitions because the partitions are at the top level of the object.
        d. Standardize capitalization: we have dashes, underscores, camel case and pretty much everything else
        e: Document them and going forward add to the documentation. This will ensure future additions are thought through holistically with all the other structures.

        Show
        Jay Kreps added a comment - Also c. Think through how any generalization would work (e.g. (1) can't really be extended beyond partitions because the partitions are at the top level of the object. d. Standardize capitalization: we have dashes, underscores, camel case and pretty much everything else e: Document them and going forward add to the documentation. This will ensure future additions are thought through holistically with all the other structures.

          People

          • Assignee:
            Swapnil Ghike
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development