Kafka
  1. Kafka
  2. KAFKA-646

Provide aggregate stats at the high level Producer and ZookeeperConsumerConnector level

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:

      Description

      WIth KAFKA-622, we measure ProducerRequestStats and FetchRequestAndResponseStats at the SyncProducer and SimpleConsumer level respectively. We could also aggregate them in the high level Producer and ZookeeperConsumerConnector level to provide an overall sense of request/response rate/size at the client level. Currently, I am not completely clear about the math that might be necessary for such aggregation or if metrics already provides an API for aggregating stats of the same type.

      We should also address the comments by Jun at KAFKA-622, I am copy pasting them here:

      60. What happens if have 2 instances of Consumers with the same clientid in the same jvm? Does one of them fail because it fails to register metrics? Ditto for Producers.
      61. ConsumerTopicStats: What if a topic is named AllTopics? We use to handle this by adding a - in topic specific stats.
      62. ZookeeperConsumerConnector: Do we need to validate groupid?
      63. ClientId: Does the clientid length need to be different from topic length?
      64. AbstractFetcherThread: When building a fetch request, do we need to pass in brokerInfo as part of the client id? BrokerInfo contains the source broker info and the fetch requests are always made to the source broker.

      1. kafka-646-patch-num1-v7.patch
        70 kB
        Swapnil Ghike
      2. kafka-646-patch-num1-v6.patch
        71 kB
        Swapnil Ghike
      3. kafka-646-patch-num1-v5.patch
        71 kB
        Swapnil Ghike
      4. kafka-646-patch-num1-v4.patch
        64 kB
        Swapnil Ghike
      5. kafka-646-patch-num1-v3.patch
        64 kB
        Swapnil Ghike
      6. kafka-646-patch-num1-v2.patch
        64 kB
        Swapnil Ghike
      7. kafka-646-patch-num1-v1.patch
        65 kB
        Swapnil Ghike

        Activity

        Swapnil Ghike created issue -
        Swapnil Ghike made changes -
        Field Original Value New Value
        Labels bugs
        Swapnil Ghike made changes -
        Assignee Swapnil Ghike [ swapnilghike ]
        Hide
        Swapnil Ghike added a comment -

        This patch has a bunch of refactoring changes and a couple of new additions.

        Addressing Jun's comments:
        These are all great catches! Thanks for being so thorough.

        60. By default, metrics-core will return an existing metric object of the same name using a getOrCreate() like functionality. As discussed offline, we should fail the clients that use an already registered clientId name. We will need to create two objects thaty contain hashmaps to record the existing producer and consumer clientIds and methods to throw an exception if a client attempts to use an existing clientId. I worked on this change a bit, but it breaks a lot of our unit tests (about half) and the refactoring will take some time. Hence, I think it will be better if I submit a patch for all other changes and create another patch for this issue under this jira. Until then we can keep this jira open.

        61. For recording stats about all topics, I am now using a string "All.Topics". Since '.' is not allowed in the legal character set for topic names, this will differentiate from a topic named AllTopics.

        62. Yes, we should validate groupId. Added the functionality and a unit test. It has the same validation rules as ClientId.

        63. A metric name is something like (clientId + topic + some string) and this entire string is limited by fillename size. We already allow topic name to be at most 255 bytes long. We could fix max lengths for each of clientId, groupId, topic name so that the metric name never exceeds filename size. But those lengths will be quite arbitrary, perhaps we should skip the check on the length of clientId and groupId.

        64. Removed brokerInfo from the clientId used to instantiate FetchRequestBuilder.

        Refactoring:
        1. Moved validation of clientId at the end of instantiation of ProducerConfig and ConsumerConfig.

        • Created static objects ProducerConfig and ConsumerConfig which contain a validate() method.

        2. Created global *Registry objects in which each high level Producer and Consumer can register their *stats objects.

        • These objects are registered in the static object only once using utils.Pool.getAndMaybePut functionality.
        • This will remove the need to pass *stats objects around the code in constructors (I thought having the metrics objects right up in the constructors was a bit intrusive, since one doesn't quite always think about the monitoring mechanism while instantiating various modules of the program, for example while unit testing.)
        • Instead of the constructor, each concerned class obtains the *Stats objects from the global registry object.
        • This cleans up any metrics objects created in the unit tests.
        • Special mention: The producer constructors are back to the old themselves. With clientId validation moved to *Config objects, the intermediate Producer constructor that merely separated the parameters of a quadruplet is gone.

        3. Created separate files

        • for ProducerStats, ProducerTopicStats, ProducerRequestStats in kafka.producer package and for FetchRequestAndResponseStats in kafka.consumer package. Thought it was appropriate given that we already had ConsumerTopicStats in a separate file, and since the code for metrics had increased in size due to addition of Registry and Aggregated objects. Added comments.
        • for objects Topic, ClientId and GroupId in kafka.utils package.
        • to move the helper case classes ClientIdAndTopic, ClientIdAndBroker to kafka.common package.

        4. Renamed a few variables to easier names (anyOldName to "metricId" change).

        New additions:
        1. Added two objects to aggregate metrics recorded by SyncProducers and SimpleConsumers at the high level Producer and Consumer.

        • For this, changed KafkaTimer to accept a list of Timers. Typically we will pass a specificTimer and a globalTimer to this KafkaTimer class. Created a new KafkaHistogram in a similar way.

        2. Validation of groupId.

        Issues:
        1. Initializing the aggregator metrics with default values: For example, let's say that a syncProducer could be created (which will register a ProducerRequestStats mbean for this syncProducer). However, if no request is sent by this syncProducer then the absense of its data is not reflected in the aggregator histogram. For instance, the min requestSize for the syncProducer that never sent a request will be 0, but this won't be accurately represented in the aggregator histogram. Thus, we need to understand that if the request count of a syncProducer is 0, then its data will not be accurately reflected in the aggregator histogram.

        The question is whether it is possible to inform the aggregator histogram of some default values without increasing the request count of any syncProducer or the aggregated stats.

        Further proposed changes:
        Another patch under this jira to address comment 60 by Jun.

        Show
        Swapnil Ghike added a comment - This patch has a bunch of refactoring changes and a couple of new additions. Addressing Jun's comments: These are all great catches! Thanks for being so thorough. 60. By default, metrics-core will return an existing metric object of the same name using a getOrCreate() like functionality. As discussed offline, we should fail the clients that use an already registered clientId name. We will need to create two objects thaty contain hashmaps to record the existing producer and consumer clientIds and methods to throw an exception if a client attempts to use an existing clientId. I worked on this change a bit, but it breaks a lot of our unit tests (about half) and the refactoring will take some time. Hence, I think it will be better if I submit a patch for all other changes and create another patch for this issue under this jira. Until then we can keep this jira open. 61. For recording stats about all topics, I am now using a string "All.Topics". Since '.' is not allowed in the legal character set for topic names, this will differentiate from a topic named AllTopics. 62. Yes, we should validate groupId. Added the functionality and a unit test. It has the same validation rules as ClientId. 63. A metric name is something like (clientId + topic + some string) and this entire string is limited by fillename size. We already allow topic name to be at most 255 bytes long. We could fix max lengths for each of clientId, groupId, topic name so that the metric name never exceeds filename size. But those lengths will be quite arbitrary, perhaps we should skip the check on the length of clientId and groupId. 64. Removed brokerInfo from the clientId used to instantiate FetchRequestBuilder. Refactoring: 1. Moved validation of clientId at the end of instantiation of ProducerConfig and ConsumerConfig. Created static objects ProducerConfig and ConsumerConfig which contain a validate() method. 2. Created global *Registry objects in which each high level Producer and Consumer can register their *stats objects. These objects are registered in the static object only once using utils.Pool.getAndMaybePut functionality. This will remove the need to pass *stats objects around the code in constructors (I thought having the metrics objects right up in the constructors was a bit intrusive, since one doesn't quite always think about the monitoring mechanism while instantiating various modules of the program, for example while unit testing.) Instead of the constructor, each concerned class obtains the *Stats objects from the global registry object. This cleans up any metrics objects created in the unit tests. Special mention: The producer constructors are back to the old themselves. With clientId validation moved to *Config objects, the intermediate Producer constructor that merely separated the parameters of a quadruplet is gone. 3. Created separate files for ProducerStats, ProducerTopicStats, ProducerRequestStats in kafka.producer package and for FetchRequestAndResponseStats in kafka.consumer package. Thought it was appropriate given that we already had ConsumerTopicStats in a separate file, and since the code for metrics had increased in size due to addition of Registry and Aggregated objects. Added comments. for objects Topic, ClientId and GroupId in kafka.utils package. to move the helper case classes ClientIdAndTopic, ClientIdAndBroker to kafka.common package. 4. Renamed a few variables to easier names (anyOldName to "metricId" change). New additions: 1. Added two objects to aggregate metrics recorded by SyncProducers and SimpleConsumers at the high level Producer and Consumer. For this, changed KafkaTimer to accept a list of Timers. Typically we will pass a specificTimer and a globalTimer to this KafkaTimer class. Created a new KafkaHistogram in a similar way. 2. Validation of groupId. Issues: 1. Initializing the aggregator metrics with default values: For example, let's say that a syncProducer could be created (which will register a ProducerRequestStats mbean for this syncProducer). However, if no request is sent by this syncProducer then the absense of its data is not reflected in the aggregator histogram. For instance, the min requestSize for the syncProducer that never sent a request will be 0, but this won't be accurately represented in the aggregator histogram. Thus, we need to understand that if the request count of a syncProducer is 0, then its data will not be accurately reflected in the aggregator histogram. The question is whether it is possible to inform the aggregator histogram of some default values without increasing the request count of any syncProducer or the aggregated stats. Further proposed changes: Another patch under this jira to address comment 60 by Jun.
        Swapnil Ghike made changes -
        Attachment kafka-646-patch-num1-v1.patch [ 12559995 ]
        Hide
        Swapnil Ghike added a comment -

        Actually I just realized that the Aggregated*Stats objects that I have created are not consistent with the way "All.Topics-MessageRate" is measured. It is possible to measure "All.Brokers-producerRequestSize" in a similar way in ProducerRequestStats.
        But it's not possible to measure "All.Brokers-ProduceRequestRateAndTimeMs" in the same manner since we use a timer block.

        To make everything look consistent, I can delete the aggregator objects from my v1 patch and create a KafkaMeter class that accepts a list of meters. Will upload another version of patch.

        Show
        Swapnil Ghike added a comment - Actually I just realized that the Aggregated*Stats objects that I have created are not consistent with the way "All.Topics-MessageRate" is measured. It is possible to measure "All.Brokers-producerRequestSize" in a similar way in ProducerRequestStats. But it's not possible to measure "All.Brokers-ProduceRequestRateAndTimeMs" in the same manner since we use a timer block. To make everything look consistent, I can delete the aggregator objects from my v1 patch and create a KafkaMeter class that accepts a list of meters. Will upload another version of patch.
        Hide
        Swapnil Ghike added a comment - - edited

        Attached patch v2.

        The changes from patch v1:
        1. Deleted KafkaHistogram class. (There is also no need for KafkaMeter class.)

        2. Deleted the Aggregated*Stats objects.

        • The metrics of SyncProducer and SimpleConsumer for different brokers are aggregated together using the same way the producerTopicStats are aggregated for "All.Topics".
        • Measuring the time for produce requests and fetch requests is achieved by passing a list of timers to KafkaTimer.
        Show
        Swapnil Ghike added a comment - - edited Attached patch v2. The changes from patch v1: 1. Deleted KafkaHistogram class. (There is also no need for KafkaMeter class.) 2. Deleted the Aggregated*Stats objects. The metrics of SyncProducer and SimpleConsumer for different brokers are aggregated together using the same way the producerTopicStats are aggregated for "All.Topics". Measuring the time for produce requests and fetch requests is achieved by passing a list of timers to KafkaTimer.
        Swapnil Ghike made changes -
        Attachment kafka-646-patch-num1-v2.patch [ 12560022 ]
        Hide
        Neha Narkhede added a comment -

        Patch v2 looks good to me. Few minor questions -
        1. Producer
        You probably don't validate the client id anymore in the secondary constructor. Shouldn't we do that ?

        2. ZookeeperConsumerConnector
        consumerTopicStats is unused

        3. Do the singleton validate() APIs need to be synchronized ?

        Show
        Neha Narkhede added a comment - Patch v2 looks good to me. Few minor questions - 1. Producer You probably don't validate the client id anymore in the secondary constructor. Shouldn't we do that ? 2. ZookeeperConsumerConnector consumerTopicStats is unused 3. Do the singleton validate() APIs need to be synchronized ?
        Hide
        Swapnil Ghike added a comment -

        Thanks for reviewing.

        Patch v3:
        1. Oh, that's because clientId is validated at the end of ProducerConfig constructor.

        2. Removed it.

        3. Currently the validate() APIs only check for illegal chars and they don't yet check whether the incoming clientId has already been taken. (I am planning to do it in a separate patch in the same jira, after this patch has been checked in).

        Show
        Swapnil Ghike added a comment - Thanks for reviewing. Patch v3: 1. Oh, that's because clientId is validated at the end of ProducerConfig constructor. 2. Removed it. 3. Currently the validate() APIs only check for illegal chars and they don't yet check whether the incoming clientId has already been taken. (I am planning to do it in a separate patch in the same jira, after this patch has been checked in).
        Swapnil Ghike made changes -
        Attachment kafka-646-patch-num1-v3.patch [ 12560279 ]
        Hide
        Swapnil Ghike added a comment -

        Fixed a typo in FetchRequestAndResponseStats mbean creation.

        Show
        Swapnil Ghike added a comment - Fixed a typo in FetchRequestAndResponseStats mbean creation.
        Swapnil Ghike made changes -
        Attachment kafka-646-patch-num1-v4.patch [ 12560292 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v4. Looks good overall. A few minor comments:

        40. GroupId,ClientId: The validation code is identical. Could we combine them into one utility? We can throw a generic InvalidConfigurationException with the right text.

        41. The patch does apply because of changes in system_test/testcase_to_run.json. Do you actually intend to change this file?

        Show
        Jun Rao added a comment - Thanks for patch v4. Looks good overall. A few minor comments: 40. GroupId,ClientId: The validation code is identical. Could we combine them into one utility? We can throw a generic InvalidConfigurationException with the right text. 41. The patch does apply because of changes in system_test/testcase_to_run.json. Do you actually intend to change this file?
        Hide
        Swapnil Ghike added a comment -

        Patch v5:

        40. I agree with you. Created a new trait Config in common package, it has a method that can validate a clientId or a groupId. ProducerConfig and ConsumerConfig extend this trait and they have their additional methods to validate config values that are specific to themselves.

        • Moved config value validations in ZookeeperConsumerConnector and Producer to ConsumerConfig and ProducerConfig objects respectively, since we can throw an InvalidConfigException when the Config is getting instantiated.
        • Moved Topic and TopicTest to common package.
        • Added a ConfigTest to common package.
        • Removed InvalidClientException and InvalidGroupException. Instead I am now re-using the existing InvalidConfigException to print the appropriate message.

        41. It automatically got changed when I ran sanity test, John probably has a patch that will add another file run_test, he says we can use it once it is checked in.

        Show
        Swapnil Ghike added a comment - Patch v5: 40. I agree with you. Created a new trait Config in common package, it has a method that can validate a clientId or a groupId. ProducerConfig and ConsumerConfig extend this trait and they have their additional methods to validate config values that are specific to themselves. Moved config value validations in ZookeeperConsumerConnector and Producer to ConsumerConfig and ProducerConfig objects respectively, since we can throw an InvalidConfigException when the Config is getting instantiated. Moved Topic and TopicTest to common package. Added a ConfigTest to common package. Removed InvalidClientException and InvalidGroupException. Instead I am now re-using the existing InvalidConfigException to print the appropriate message. 41. It automatically got changed when I ran sanity test, John probably has a patch that will add another file run_test, he says we can use it once it is checked in.
        Swapnil Ghike made changes -
        Attachment kafka-646-patch-num1-v5.patch [ 12560434 ]
        Hide
        Neha Narkhede added a comment -

        +1 on v5. I think we can check this in and wait for v6 addressing rest of the issues ?

        Show
        Neha Narkhede added a comment - +1 on v5. I think we can check this in and wait for v6 addressing rest of the issues ?
        Hide
        Jun Rao added a comment -

        Thanks for patch v5. A few more comments:

        50. Config:
        50.1 Could we rename validateClientIdOrGroupId to sth more general like validateAlphaNumericString?
        50.2 We should add a separator btw prop and value in the message string of InvalidConfigException.

        51. ConsumerConfig: In validateAutoOffsetReset(), let's add the autoOffsetReset value in the message string of InvalidConfigException.

        52. ProducerConfig: In validateProducerType(), let's add the producerType value in the message string of InvalidConfigException.

        Show
        Jun Rao added a comment - Thanks for patch v5. A few more comments: 50. Config: 50.1 Could we rename validateClientIdOrGroupId to sth more general like validateAlphaNumericString? 50.2 We should add a separator btw prop and value in the message string of InvalidConfigException. 51. ConsumerConfig: In validateAutoOffsetReset(), let's add the autoOffsetReset value in the message string of InvalidConfigException. 52. ProducerConfig: In validateProducerType(), let's add the producerType value in the message string of InvalidConfigException.
        Hide
        Swapnil Ghike added a comment -

        Made the changes.

        Show
        Swapnil Ghike added a comment - Made the changes.
        Swapnil Ghike made changes -
        Attachment kafka-646-patch-num1-v6.patch [ 12560529 ]
        Hide
        Swapnil Ghike added a comment -

        Including in patch v7 Joel's suggestion at KAFKA-604 to measure time as following :

        aggregatekafkatimer.time {
        specifickafkatimer.time

        { <code block> }

        }

        This style is probably ok, since we don't have a use case where we use more than two timers to time the same block. This also removes the need to modify KafkaTimer.

        Show
        Swapnil Ghike added a comment - Including in patch v7 Joel's suggestion at KAFKA-604 to measure time as following : aggregatekafkatimer.time { specifickafkatimer.time { <code block> } } This style is probably ok, since we don't have a use case where we use more than two timers to time the same block. This also removes the need to modify KafkaTimer.
        Swapnil Ghike made changes -
        Attachment kafka-646-patch-num1-v7.patch [ 12560615 ]
        Hide
        Neha Narkhede added a comment -

        Thanks for v7. Just checked it in.

        Show
        Neha Narkhede added a comment - Thanks for v7. Just checked it in.
        Neha Narkhede made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Neha Narkhede made changes -
        Status Resolved [ 5 ] Closed [ 6 ]

          People

          • Assignee:
            Swapnil Ghike
            Reporter:
            Swapnil Ghike
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development