Kafka
  1. Kafka
  2. KAFKA-385

RequestPurgatory enhancements - expire/checkSatisfy issue; add jmx beans

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:

      Description

      As discussed in KAFKA-353:
      1 - There is potential for a client-side race condition in the implementations of expire and checkSatisfied. We can just synchronize on the DelayedItem.
      2 - Would be good to add jmx beans to facilitate monitoring RequestPurgatory stats.

      1. example_dashboard.jpg
        206 kB
        Joel Koshy
      2. graphite_explorer.jpg
        122 kB
        Joel Koshy
      3. KAFKA-385-v1.patch
        72 kB
        Joel Koshy
      4. KAFKA-385-v2.patch
        61 kB
        Joel Koshy
      5. KAFKA-385-v3.patch
        68 kB
        Joel Koshy
      6. KAFKA-385-v3.patch
        68 kB
        Joel Koshy
      7. KAFKA-385-v3-with-lazy-fix.patch
        7 kB
        Joel Koshy
      8. KAFKA-385-v4.patch
        68 kB
        Joel Koshy

        Activity

        Hide
        Joel Koshy added a comment -

        Summary of changes and notes:

        1 - Fixed the synchronization issue (raised in KAFKA-353) between
        checkSatisfied and expire by synchronizing on the DelayedItem.

        2 - Added request purgatory metrics using the metrics-core library. Also
        added support for csv/ganglia/graphite reporters which I think is useful -
        e.g., I attached a graphite dashboard that was pretty easy to whip up. It
        should be a breeze to use metrics-core for other stats in Kafka.

        3 - This brings in dependencies on metrics and slf4j, both with Apache
        compatible licenses. I don't know of any specific best-practices in using
        metrics-core as I have not used it before, so it would be great if people
        with experience using it glance over this patch.

        4 - It's a bit hard to tell right now which metrics are useful and which are
        pointless/redundant. We can iron that out over time.

        5 - Some metrics are only global and both global and per-key (which I think
        is useful to have, e.g., to get a quick view of which partitions are
        slower). E.g., it helped to see (in the attached screen shots) that fetch
        requests were all expiring - and it turned out to be a bug in how
        DelayedFetch requests from followers are checked for satisfaction. The
        issue is that maybeUnblockDelayedFetch is only called if required acks is
        0/1. We need to call it always - in the FetchRequestPurgatory
        checkSatisfied method, if it is a follower request then we need to use
        logendoffset to determine the available bytes to the fetch request, and HW
        if it is a non-follower request. I fixed it to always check
        availableFetchBytes, but it can be made a little more efficient by having
        the DelayedFetch request keep track of currently available bytes in each
        topic-partition key.

        6 - I realized that both the watchersForKey and per-key metrics pools keep
        growing. It may be useful to have a simple garbage collector in the Pool
        class that garbage collects entries that are stale (e.g., due to a
        leader-change), but this is non-critical.

        7 - I needed to maintain DelayedRequest metrics outside the purgatory:
        because the purgatory itself is abstract and does not have internal
        knowledge of delayed requests and their keys. Note that these metrics are
        for delayed requests - i.e., these metrics are not updated for those
        requests that are satisfied immediately without going through the
        purgatory.

        8 - There is one subtlety with producer throughput: I wanted to keep per-key
        throughput, so the metric is updated on individual key satisfaction. This
        does not mean that the DelayedProduce itself will be satisfied - i.e,.
        what the metric reports is an upper-bound since some DelayedProduce
        requests may have expired.

        9 - I think it is better to wait for Kafka-376 to go in first. In this
        patch, I hacked a simpler version of that patch - i.e., in
        availableFetchBytes, I check the logEndOffset instead of the
        high-watermark. Otherwise, follower fetch requests would see zero
        available bytes. Of course, this hack now breaks non-follower fetch
        requests.

        10 - KafkaApis is getting pretty big - I can try and move DelayedMetrics out
        if that helps although I prefer having it inside since all the
        DelayedRequests and purgatories are in there.

        11 - There may be some temporary edits to start scripts/log4j that I will
        revert in the final patch.

        What's left to do:

        a - This was a rather painful rebase, so I need to review in case I missed
        something.

        b - Optimization described above: DelayedFetch should keep track of
        bytesAvailable for each key and FetchRequestPurgatory's checkSatisfied
        should take a topic, partition and compute availableBytes for just that
        key.

        c - The JMX operations to start and stop the reporters are not working
        properly. I think I understand the issue, but will fix later.

        Show
        Joel Koshy added a comment - Summary of changes and notes: 1 - Fixed the synchronization issue (raised in KAFKA-353 ) between checkSatisfied and expire by synchronizing on the DelayedItem. 2 - Added request purgatory metrics using the metrics-core library. Also added support for csv/ganglia/graphite reporters which I think is useful - e.g., I attached a graphite dashboard that was pretty easy to whip up. It should be a breeze to use metrics-core for other stats in Kafka. 3 - This brings in dependencies on metrics and slf4j, both with Apache compatible licenses. I don't know of any specific best-practices in using metrics-core as I have not used it before, so it would be great if people with experience using it glance over this patch. 4 - It's a bit hard to tell right now which metrics are useful and which are pointless/redundant. We can iron that out over time. 5 - Some metrics are only global and both global and per-key (which I think is useful to have, e.g., to get a quick view of which partitions are slower). E.g., it helped to see (in the attached screen shots) that fetch requests were all expiring - and it turned out to be a bug in how DelayedFetch requests from followers are checked for satisfaction. The issue is that maybeUnblockDelayedFetch is only called if required acks is 0/1. We need to call it always - in the FetchRequestPurgatory checkSatisfied method, if it is a follower request then we need to use logendoffset to determine the available bytes to the fetch request, and HW if it is a non-follower request. I fixed it to always check availableFetchBytes, but it can be made a little more efficient by having the DelayedFetch request keep track of currently available bytes in each topic-partition key. 6 - I realized that both the watchersForKey and per-key metrics pools keep growing. It may be useful to have a simple garbage collector in the Pool class that garbage collects entries that are stale (e.g., due to a leader-change), but this is non-critical. 7 - I needed to maintain DelayedRequest metrics outside the purgatory: because the purgatory itself is abstract and does not have internal knowledge of delayed requests and their keys. Note that these metrics are for delayed requests - i.e., these metrics are not updated for those requests that are satisfied immediately without going through the purgatory. 8 - There is one subtlety with producer throughput: I wanted to keep per-key throughput, so the metric is updated on individual key satisfaction. This does not mean that the DelayedProduce itself will be satisfied - i.e,. what the metric reports is an upper-bound since some DelayedProduce requests may have expired. 9 - I think it is better to wait for Kafka-376 to go in first. In this patch, I hacked a simpler version of that patch - i.e., in availableFetchBytes, I check the logEndOffset instead of the high-watermark. Otherwise, follower fetch requests would see zero available bytes. Of course, this hack now breaks non-follower fetch requests. 10 - KafkaApis is getting pretty big - I can try and move DelayedMetrics out if that helps although I prefer having it inside since all the DelayedRequests and purgatories are in there. 11 - There may be some temporary edits to start scripts/log4j that I will revert in the final patch. What's left to do: a - This was a rather painful rebase, so I need to review in case I missed something. b - Optimization described above: DelayedFetch should keep track of bytesAvailable for each key and FetchRequestPurgatory's checkSatisfied should take a topic, partition and compute availableBytes for just that key. c - The JMX operations to start and stop the reporters are not working properly. I think I understand the issue, but will fix later.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Overall, it seems that metrics-core is easy to use. Some comments:

        1. KafkaMetricsGroup.metricName: add a comment on the following statement and explain what it does
        actualPkg.replaceFirst("""\.""", ".%s.".format(ident))

        2. Unused imports: KafkaApis, KafkaConfig

        3. Pool.getAndMaybePut(): This seems to force the caller to create a new value object on each call and in most cases, the new object is not needed.

        4. FetchRequestKey and ProduceRequestKey: can they be shared?

        5. Is it better to put all metrics classes in one package metrics?

        6. It's useful to have topic level stats. I am wondering if we can just keep stats at the global and the topic level, but not at topic/partition level.

        7. DelayedProducerRequestMetrics,DelayedFetchRequestMetrics : Can we name the stats consistently? Something like FetchSatisfiedTime and ProduceStatisfiedTime, FetchSatisfiedRequest and ProduceStatisfiedRequest. Also, there is some overlap with the stats in BrokerTopicStat.

        8. ProducerPerformance: Why forcing producer acks to be 2? Shouldn't that come from the command line?

        9. The changes and the new files in config/: Are they needed?

        Show
        Jun Rao added a comment - Thanks for the patch. Overall, it seems that metrics-core is easy to use. Some comments: 1. KafkaMetricsGroup.metricName: add a comment on the following statement and explain what it does actualPkg.replaceFirst("""\.""", ".%s.".format(ident)) 2. Unused imports: KafkaApis, KafkaConfig 3. Pool.getAndMaybePut(): This seems to force the caller to create a new value object on each call and in most cases, the new object is not needed. 4. FetchRequestKey and ProduceRequestKey: can they be shared? 5. Is it better to put all metrics classes in one package metrics? 6. It's useful to have topic level stats. I am wondering if we can just keep stats at the global and the topic level, but not at topic/partition level. 7. DelayedProducerRequestMetrics,DelayedFetchRequestMetrics : Can we name the stats consistently? Something like FetchSatisfiedTime and ProduceStatisfiedTime, FetchSatisfiedRequest and ProduceStatisfiedRequest. Also, there is some overlap with the stats in BrokerTopicStat. 8. ProducerPerformance: Why forcing producer acks to be 2? Shouldn't that come from the command line? 9. The changes and the new files in config/: Are they needed?
        Hide
        Joel Koshy added a comment -

        Thanks for the review.

        1. Will do.

        2. Do you remember which class? I can run an optimize imports everywhere, so
        there may be some noise in the final patch.

        3. Oops - yes that's true. I'll fix this to accept a factory method that
        creates the object if absent.

        4. Yes - I'll combine them.

        5. I had considered this, but I think it is better to keep only generic stuff in the metrics package and keep concrete metrics close to where they are used. I think we can decide this as part of KAFKA-203 since that
        may result in a more elaborate wrapper around metrics-core than I have
        now in the kafka.metrics package.

        6/7. The issue is that produce/fetch request stats are asymmetric. E.g., per-key expiration stats do not make sense for DelayedFetch requests, but they do make sense for DelayedProduce requests. Also, the caught-up fetch
        request rps and duration stats for the producer are updated on a per-key
        basis so that's why they are named as such - i.e., it does not exactly equal the satisfaction time. I can add an additional stat that does track
        satisfaction time for completely satisfied produce requests. There is
        some overlap with BrokerTopicStat but as I mentioned in (7) in my first
        comment these stats are only account for DelayedRequests. As for topic
        level stats, throughput stats would help, but are satisfaction/expiration
        stats (especially with multi-produce) useful since those events occur at
        the key(partition)-level?

        8. Yes - will revert. This was done before Neha added that option in
        KAFKA-350.

        9. Not required - I used it for testing only. Will revert.

        Show
        Joel Koshy added a comment - Thanks for the review. 1. Will do. 2. Do you remember which class? I can run an optimize imports everywhere, so there may be some noise in the final patch. 3. Oops - yes that's true. I'll fix this to accept a factory method that creates the object if absent. 4. Yes - I'll combine them. 5. I had considered this, but I think it is better to keep only generic stuff in the metrics package and keep concrete metrics close to where they are used. I think we can decide this as part of KAFKA-203 since that may result in a more elaborate wrapper around metrics-core than I have now in the kafka.metrics package. 6/7. The issue is that produce/fetch request stats are asymmetric. E.g., per-key expiration stats do not make sense for DelayedFetch requests, but they do make sense for DelayedProduce requests. Also, the caught-up fetch request rps and duration stats for the producer are updated on a per-key basis so that's why they are named as such - i.e., it does not exactly equal the satisfaction time. I can add an additional stat that does track satisfaction time for completely satisfied produce requests. There is some overlap with BrokerTopicStat but as I mentioned in (7) in my first comment these stats are only account for DelayedRequests. As for topic level stats, throughput stats would help, but are satisfaction/expiration stats (especially with multi-produce) useful since those events occur at the key(partition)-level? 8. Yes - will revert. This was done before Neha added that option in KAFKA-350 . 9. Not required - I used it for testing only. Will revert.
        Hide
        Joel Koshy added a comment -

        Also, re: my comment concerning the dependency on KAFKA-376: this is not strictly required - KAFKA-350 actually made a similar temporary fix to use the log end offset instead of hw.

        Show
        Joel Koshy added a comment - Also, re: my comment concerning the dependency on KAFKA-376 : this is not strictly required - KAFKA-350 actually made a similar temporary fix to use the log end offset instead of hw.
        Hide
        Joel Koshy added a comment -

        The jira system was down around the time of my last comment and it was not
        sent to the mailing list. That further explains some of these updates.

        1 - Added produce satisfied stats.
        2 - Common request key for fetch/produce.
        3 - Factory method for getAndMaybePut.
        4 - Added doc describing fudged name for KafkaMetricsGroup.
        5 - Fixed the issues with the JMX operations I had earlier.
        6 - Reverted the ProducerPerformance and temporary config changes.

        I decided against doing the optimization for the FetchRequestPurgatory
        checkSatisfied as I don't think it makes a significant difference, as the
        available bytes computation is pretty much in memory.

        One more comment on the usage of metrics: it is possible to combine
        rate/histogram metrics into a metrics Timer object. I chose not to do this
        because the code turns out a little cleaner by using a meter/histogram, but
        will revisit later.

        Show
        Joel Koshy added a comment - The jira system was down around the time of my last comment and it was not sent to the mailing list. That further explains some of these updates. 1 - Added produce satisfied stats. 2 - Common request key for fetch/produce. 3 - Factory method for getAndMaybePut. 4 - Added doc describing fudged name for KafkaMetricsGroup. 5 - Fixed the issues with the JMX operations I had earlier. 6 - Reverted the ProducerPerformance and temporary config changes. I decided against doing the optimization for the FetchRequestPurgatory checkSatisfied as I don't think it makes a significant difference, as the available bytes computation is pretty much in memory. One more comment on the usage of metrics: it is possible to combine rate/histogram metrics into a metrics Timer object. I chose not to do this because the code turns out a little cleaner by using a meter/histogram, but will revisit later.
        Hide
        Joel Koshy added a comment -

        Just want to add a comment on metrics overheads of using histograms, meters
        and gauges.

        Meters use an exponential weighted moving average and don't need to maintain
        past data. Histograms are implemented using reservoir sampling and need to
        maintain a sampling array. There is only one per-key histogram - the
        "follower catch-up time" histogram in DelayedProducerRequestMetrics. There
        are four more global histograms.

        The sampling array is 1028 atomic longs by default. So if we have say, 500
        topics with four partitions each, four brokers, assume that leadership is
        evenly spread out, a lower bound (if we ignore object overheads and the
        global histograms) on memory would be ~ 4MB per broker.

        Also, after looking at the metrics code a little more, I think we should use
        the exponentially decaying sampling option. By default, the histogram uses a
        uniform sample - i.e., it effectively takes a uniform sample from all the
        seen data points. Exponential decaying sampling gives more weight to the
        past five minutes of data - but it would use at least double the memory of
        uniform sampling which pushes memory usage to over 8MB.

        Show
        Joel Koshy added a comment - Just want to add a comment on metrics overheads of using histograms, meters and gauges. Meters use an exponential weighted moving average and don't need to maintain past data. Histograms are implemented using reservoir sampling and need to maintain a sampling array. There is only one per-key histogram - the "follower catch-up time" histogram in DelayedProducerRequestMetrics. There are four more global histograms. The sampling array is 1028 atomic longs by default. So if we have say, 500 topics with four partitions each, four brokers, assume that leadership is evenly spread out, a lower bound (if we ignore object overheads and the global histograms) on memory would be ~ 4MB per broker. Also, after looking at the metrics code a little more, I think we should use the exponentially decaying sampling option. By default, the histogram uses a uniform sample - i.e., it effectively takes a uniform sample from all the seen data points. Exponential decaying sampling gives more weight to the past five minutes of data - but it would use at least double the memory of uniform sampling which pushes memory usage to over 8MB.
        Hide
        Neha Narkhede added a comment -

        Patch v2 doesn't apply cleanly on a fresh checkout of 0.8 -

        nnarkhed-ld:kafka-385 nnarkhed$ patch -p0 -i ~/Projects/kafka-patches/KAFKA-385-v2.patch
        patching file core/src/main/scala/kafka/Kafka.scala
        patching file core/src/main/scala/kafka/api/FetchResponse.scala
        patching file core/src/main/scala/kafka/metrics/KafkaMetrics.scala
        patching file core/src/main/scala/kafka/metrics/KafkaMetricsConfigShared.scala
        patching file core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
        patching file core/src/main/scala/kafka/server/KafkaApis.scala
        Hunk #4 FAILED at 160.
        Hunk #12 FAILED at 360.
        2 out of 24 hunks FAILED – saving rejects to file core/src/main/scala/kafka/server/KafkaApis.scala.rej
        patching file core/src/main/scala/kafka/server/KafkaConfig.scala
        Hunk #1 succeeded at 24 with fuzz 2 (offset 2 lines).
        Hunk #2 succeeded at 36 with fuzz 1 (offset 2 lines).
        Hunk #3 succeeded at 139 (offset 2 lines).
        patching file core/src/main/scala/kafka/server/RequestPurgatory.scala
        patching file core/src/main/scala/kafka/utils/Pool.scala
        patching file core/src/main/scala/kafka/utils/Utils.scala
        Hunk #1 succeeded at 502 (offset 1 line).
        patching file core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
        patching file core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
        patching file core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
        patching file project/build/KafkaProject.scala

        Show
        Neha Narkhede added a comment - Patch v2 doesn't apply cleanly on a fresh checkout of 0.8 - nnarkhed-ld:kafka-385 nnarkhed$ patch -p0 -i ~/Projects/kafka-patches/ KAFKA-385 -v2.patch patching file core/src/main/scala/kafka/Kafka.scala patching file core/src/main/scala/kafka/api/FetchResponse.scala patching file core/src/main/scala/kafka/metrics/KafkaMetrics.scala patching file core/src/main/scala/kafka/metrics/KafkaMetricsConfigShared.scala patching file core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala patching file core/src/main/scala/kafka/server/KafkaApis.scala Hunk #4 FAILED at 160. Hunk #12 FAILED at 360. 2 out of 24 hunks FAILED – saving rejects to file core/src/main/scala/kafka/server/KafkaApis.scala.rej patching file core/src/main/scala/kafka/server/KafkaConfig.scala Hunk #1 succeeded at 24 with fuzz 2 (offset 2 lines). Hunk #2 succeeded at 36 with fuzz 1 (offset 2 lines). Hunk #3 succeeded at 139 (offset 2 lines). patching file core/src/main/scala/kafka/server/RequestPurgatory.scala patching file core/src/main/scala/kafka/utils/Pool.scala patching file core/src/main/scala/kafka/utils/Utils.scala Hunk #1 succeeded at 502 (offset 1 line). patching file core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala patching file core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala patching file core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala patching file project/build/KafkaProject.scala
        Hide
        Joel Koshy added a comment -

        Yes - looks like KAFKA-369 went in yesterday. I will need to rebase now.

        Show
        Joel Koshy added a comment - Yes - looks like KAFKA-369 went in yesterday. I will need to rebase now.
        Hide
        Jay Kreps added a comment -

        1. Can we make the reporters pluggable? We shouldn't hard code those, you should just give something like
        metrics.reporters=com.xyz.MyReporter, com.xyz.YourReporter
        2. Please remove the reference to scala class names from the logging (e.g. DelayedProduce)
        3. How are you measuring the performance impact of the change you made to synchronization?
        4. It would be good to cut-and-paste the scala timer class they provide in the scala wrapper. That is really nice.
        5. Size.
        I think the overhead is the following:
        8 byte pointer to the value
        12 byte object header
        8 byte value
        Total: 28 bytes

        This is too much memory for something that should just be monitoring. I think we should not do per-key histograms.

        Show
        Jay Kreps added a comment - 1. Can we make the reporters pluggable? We shouldn't hard code those, you should just give something like metrics.reporters=com.xyz.MyReporter, com.xyz.YourReporter 2. Please remove the reference to scala class names from the logging (e.g. DelayedProduce) 3. How are you measuring the performance impact of the change you made to synchronization? 4. It would be good to cut-and-paste the scala timer class they provide in the scala wrapper. That is really nice. 5. Size. I think the overhead is the following: 8 byte pointer to the value 12 byte object header 8 byte value Total: 28 bytes This is too much memory for something that should just be monitoring. I think we should not do per-key histograms.
        Hide
        Joel Koshy added a comment -

        I'm about to upload a rebased patch so I'd like to incorporate as much of this as I can.

        > 1. Can we make the reporters pluggable? We shouldn't hard code those, you should just give something like
        metrics.reporters=com.xyz.MyReporter, com.xyz.YourReporter

        These (and ConsoleReporter) are the only reporters currently available. It would be good to make it pluggable, but their constructors are different. i.e., I don't think it is possible to do this without using a spring-like framework.

        > 2. Please remove the reference to scala class names from the logging (e.g. DelayedProduce)

        I don't follow this comment - can you clarify? Which file are you referring to?

        > 3. How are you measuring the performance impact of the change you made to synchronization?

        I did not - I can measure the satisfaction time with and without the synchronization but I doubt it would add much overhead. Also, we have to add synchronization one way or the other - either inside the purgatory or outside (i.e,. burden the client's usage).

        > 4. It would be good to cut-and-paste the scala timer class they provide in the scala wrapper. That is really nice.

        They == ? Not clear on this - can you clarify?

        > 5. Size.
        I think the overhead is the following:
        8 byte pointer to the value
        12 byte object header
        8 byte value
        Total: 28 bytes

        > This is too much memory for something that should just be monitoring. I think we should not do per-key histograms.

        This is certainly a concern. So with the scenario I have in my previous comment, this would be > 13MB per broker and > double that if I use exponentially decaying sampling. That said, there is only one per-key histogram (which is the follower catch up time). OTOH since the main use I can think of is to see which followers are slower we can achieve that with grep's in the log. So I guess the visual benefit comes at a prohibitive memory cost.

        Show
        Joel Koshy added a comment - I'm about to upload a rebased patch so I'd like to incorporate as much of this as I can. > 1. Can we make the reporters pluggable? We shouldn't hard code those, you should just give something like metrics.reporters=com.xyz.MyReporter, com.xyz.YourReporter These (and ConsoleReporter) are the only reporters currently available. It would be good to make it pluggable, but their constructors are different. i.e., I don't think it is possible to do this without using a spring-like framework. > 2. Please remove the reference to scala class names from the logging (e.g. DelayedProduce) I don't follow this comment - can you clarify? Which file are you referring to? > 3. How are you measuring the performance impact of the change you made to synchronization? I did not - I can measure the satisfaction time with and without the synchronization but I doubt it would add much overhead. Also, we have to add synchronization one way or the other - either inside the purgatory or outside (i.e,. burden the client's usage). > 4. It would be good to cut-and-paste the scala timer class they provide in the scala wrapper. That is really nice. They == ? Not clear on this - can you clarify? > 5. Size. I think the overhead is the following: 8 byte pointer to the value 12 byte object header 8 byte value Total: 28 bytes > This is too much memory for something that should just be monitoring. I think we should not do per-key histograms. This is certainly a concern. So with the scenario I have in my previous comment, this would be > 13MB per broker and > double that if I use exponentially decaying sampling. That said, there is only one per-key histogram (which is the follower catch up time). OTOH since the main use I can think of is to see which followers are slower we can achieve that with grep's in the log. So I guess the visual benefit comes at a prohibitive memory cost.
        Hide
        Joel Koshy added a comment -

        Changes over v2:

        • Rebased (twice!)
        • For the remaining (global) histograms switched to biased histograms.
        • Addressed Jay's comments:
        • 1: actually there's a workaround - basically pass through the properties
          to the custom reporter. (I provided an example
          (KafkaCSVMetricsReporter). If JMX operations need to be exposed the
          custom reporter will need to implement an mbean trait that extends from
          KafkaMetricsReporterMBean. I did this to avoid having to implement the
          DynamicMBean interface. Since we now have pluggable reporters I removed
          the KafkaMetrics class and the dependency on metrics-ganglia and
          metrics-graphite.
        • 2: changed the logging statements in KafkaApis to just say producer
          requests/fetch requests.
        • 3: I did a quick test as described above, but couldn't see any
          measurable impact.
        • 4: Added KafkaTimer and a unit test (which I'm thinking of removing as
          it is pretty dumb other than showing how to use it).
        • 5: Got rid of the per-key follower catch up time histogram from
          DelayedProduceMetrics. Furthemore, meters are inexpensive and the
          per-key caught up follower request meter should be sufficient.
        Show
        Joel Koshy added a comment - Changes over v2: Rebased (twice!) For the remaining (global) histograms switched to biased histograms. Addressed Jay's comments: 1: actually there's a workaround - basically pass through the properties to the custom reporter. (I provided an example (KafkaCSVMetricsReporter). If JMX operations need to be exposed the custom reporter will need to implement an mbean trait that extends from KafkaMetricsReporterMBean. I did this to avoid having to implement the DynamicMBean interface. Since we now have pluggable reporters I removed the KafkaMetrics class and the dependency on metrics-ganglia and metrics-graphite. 2: changed the logging statements in KafkaApis to just say producer requests/fetch requests. 3: I did a quick test as described above, but couldn't see any measurable impact. 4: Added KafkaTimer and a unit test (which I'm thinking of removing as it is pretty dumb other than showing how to use it). 5: Got rid of the per-key follower catch up time histogram from DelayedProduceMetrics. Furthemore, meters are inexpensive and the per-key caught up follower request meter should be sufficient.
        Hide
        Jay Kreps added a comment -

        +1

        Show
        Jay Kreps added a comment - +1
        Hide
        Joel Koshy added a comment -

        I forgot to update the running flag in the example reporter - will fix that before check-in.

        Show
        Joel Koshy added a comment - I forgot to update the running flag in the example reporter - will fix that before check-in.
        Hide
        Joel Koshy added a comment -

        Jun had brought up one more problem - the factory method to getAndMaybePut doesn't really
        fix the problem of avoiding instantiation of objects if they are already present in the Pool since the
        anonymous function that I use needs to instantiate the object. I tweaked the code to use lazy vals
        and used logging to verify that each object in the Pool is instantiated only once.

        From what I understand, it seems lazy val's implementation effectively uses a synchronized bitmap
        to keep track of whether a particular val has been created or not. However, I'm not so sure how it works
        if the val involves a parameter. e.g., lazy val factory = new MyClass(param) as opposed to
        lazy val factory = new MyClass The concern is that scala may need to create some internal wrapper
        classes (at runtime). I tried disassembling the bytecode but did not want to spend too much time on
        it - so I thought I'd ask if anyone know details of how lazy vals work when the actual instance is only
        known at runtime?

        Show
        Joel Koshy added a comment - Jun had brought up one more problem - the factory method to getAndMaybePut doesn't really fix the problem of avoiding instantiation of objects if they are already present in the Pool since the anonymous function that I use needs to instantiate the object. I tweaked the code to use lazy vals and used logging to verify that each object in the Pool is instantiated only once. From what I understand, it seems lazy val's implementation effectively uses a synchronized bitmap to keep track of whether a particular val has been created or not. However, I'm not so sure how it works if the val involves a parameter. e.g., lazy val factory = new MyClass(param) as opposed to lazy val factory = new MyClass The concern is that scala may need to create some internal wrapper classes (at runtime). I tried disassembling the bytecode but did not want to spend too much time on it - so I thought I'd ask if anyone know details of how lazy vals work when the actual instance is only known at runtime?
        Hide
        Joel Koshy added a comment -

        Attached an incremental patch over v3 to illustrate.

        Show
        Joel Koshy added a comment - Attached an incremental patch over v3 to illustrate.
        Hide
        Jun Rao added a comment -

        For the issue with getAndMaybePut(), can we add an optional createValue() method to the constructor of Pool? If an object doesn't exist for a key, Pool can call createValue() to create a new value object from key.

        A few other comments:
        30. KafkaConfig: brokerid should be a required property. So we shouldn't put a default value.

        31. satisfiedRequestMeter: Currently, it doesn't include the time for expired requests. I prefer to have a stat that gives the time for all requests, whether expired or not.

        32. I am not sure how useful the metric CaughtUpFollowerFetchRequestsPerSecond is.

        Show
        Jun Rao added a comment - For the issue with getAndMaybePut(), can we add an optional createValue() method to the constructor of Pool? If an object doesn't exist for a key, Pool can call createValue() to create a new value object from key. A few other comments: 30. KafkaConfig: brokerid should be a required property. So we shouldn't put a default value. 31. satisfiedRequestMeter: Currently, it doesn't include the time for expired requests. I prefer to have a stat that gives the time for all requests, whether expired or not. 32. I am not sure how useful the metric CaughtUpFollowerFetchRequestsPerSecond is.
        Hide
        Joel Koshy added a comment -

        That's a good suggestion, and should work as well. I'll make that change tomorrow. It would be good to understand the lazy val implementation as well - will try and dig into some toy examples.

        30. Correct - I had reverted that in the last attachment. There was a (temporary I think) reason I needed that default but I don't remember.

        For 31 and 32 I think we should defer this to a later discussion. I actually had expiration time before but removed it. I'm not sure it makes a lot of sense. It would perhaps be useful to detect producers that are setting a very low expiration period, but even so it is driven by producer configs and would be a mash-up of values from different producers with expiring requests. Stats can be asymmetric between delayed-produce/delayed-fetch and also between expired/satisfied.

        Show
        Joel Koshy added a comment - That's a good suggestion, and should work as well. I'll make that change tomorrow. It would be good to understand the lazy val implementation as well - will try and dig into some toy examples. 30. Correct - I had reverted that in the last attachment. There was a (temporary I think) reason I needed that default but I don't remember. For 31 and 32 I think we should defer this to a later discussion. I actually had expiration time before but removed it. I'm not sure it makes a lot of sense. It would perhaps be useful to detect producers that are setting a very low expiration period, but even so it is driven by producer configs and would be a mash-up of values from different producers with expiring requests. Stats can be asymmetric between delayed-produce/delayed-fetch and also between expired/satisfied.
        Hide
        Joel Koshy added a comment -

        Moved the valueFactory to Pool's constructor.
        Unit tests/system tests pass.

        Show
        Joel Koshy added a comment - Moved the valueFactory to Pool's constructor. Unit tests/system tests pass.
        Hide
        Jun Rao added a comment -

        +1 on patch v4.

        Show
        Jun Rao added a comment - +1 on patch v4.
        Hide
        Joel Koshy added a comment -

        Thanks for the reviews. Committed to 0.8.

        Show
        Joel Koshy added a comment - Thanks for the reviews. Committed to 0.8.

          People

          • Assignee:
            Joel Koshy
            Reporter:
            Joel Koshy
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development