Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:

      Description

      Currently metrics in kafka are using old-school JMX directly. This makes adding metrics a pain. It would be good to do one of the following:
      1. Convert to Coda Hale's metrics package (https://github.com/codahale/metrics)
      2. Write a simple metrics package

      The new metrics package should make metrics easier to add and work with and package up the common logic of keeping windowed gauges, histograms, counters, etc. JMX should be just one output of this.

      The advantage of the Coda Hale package is that it exists so we don't need to write it. The downsides are (1) introduces another client dependency which causes conflicts, and (2) seems a bit heavy on design. The good news is that the metrics-core package doesn't seem to bring in a lot of dependencies which is nice, though the scala wrapper seems to want scala 2.9. I am also a little skeptical of the approach for histograms--it does sampling instead of bucketing though that may be okay.

      1. kafka-203_v1.patch
        94 kB
        Jun Rao
      2. kafka-203_v2.patch
        123 kB
        Jun Rao

        Issue Links

          Activity

          Hide
          tgautier Taylor Gautier added a comment -

          FYI we are using the JMX metrics currently.

          Show
          tgautier Taylor Gautier added a comment - FYI we are using the JMX metrics currently.
          Hide
          cburroughs Chris Burroughs added a comment -

          I think metrics is source compatible with 2.8, but I would need to investigate more.

          Show
          cburroughs Chris Burroughs added a comment - I think metrics is source compatible with 2.8, but I would need to investigate more.
          Hide
          charmalloc Joe Stein added a comment -

          I have implemented Coda Hale's Metric package and have it running in production.

          pros:

          • good coverage for typical needs with not much extra effort once it is integrated
          • ganglia context
          • graphite context (I don't use but it is a pro for someone)

          cons:

          • package causes a dependency
          • the MetricsServlet is only JSON so some extra effort to have a version that also can be better human readable (better for debugging something I found often)
          • builds are 2.9.1 (I run kafka in 2.9.1 build with it and it works fine though porting it back to 2.8 may get hair and tricky)

          IMHO

          • since there is no compatible Scala version in maven we would basically have to checkout or fork the code, build it and include it (or publish the build ourselves to maven).
          • we should have our own version of MetricsServlet ... if someone wants to implement the http embedded jetty server it should be configurable which then makes the jetty jar not required

          So I am +1 in using source from Coda Hale and keeping that layer as the layer for metrics (we could check in the specific files we need/want to a package kafka.metrics and then make changes as we need (license = ASL 2.0 https://github.com/codahale/metrics/blob/master/LICENSE)

          I am comfortable with codahale/metrics source and I would contribute this by pulling the parts in we are looking to use with an initial implementation for using them allowing (replacing anything existing JMX as this would handle that and other context we include like the servlet, ganglia, whatever else we want to make) others to-do so for other things moving forward.

          Show
          charmalloc Joe Stein added a comment - I have implemented Coda Hale's Metric package and have it running in production. pros: good coverage for typical needs with not much extra effort once it is integrated ganglia context graphite context (I don't use but it is a pro for someone) cons: package causes a dependency the MetricsServlet is only JSON so some extra effort to have a version that also can be better human readable (better for debugging something I found often) builds are 2.9.1 (I run kafka in 2.9.1 build with it and it works fine though porting it back to 2.8 may get hair and tricky) IMHO since there is no compatible Scala version in maven we would basically have to checkout or fork the code, build it and include it (or publish the build ourselves to maven). we should have our own version of MetricsServlet ... if someone wants to implement the http embedded jetty server it should be configurable which then makes the jetty jar not required So I am +1 in using source from Coda Hale and keeping that layer as the layer for metrics (we could check in the specific files we need/want to a package kafka.metrics and then make changes as we need (license = ASL 2.0 https://github.com/codahale/metrics/blob/master/LICENSE ) I am comfortable with codahale/metrics source and I would contribute this by pulling the parts in we are looking to use with an initial implementation for using them allowing (replacing anything existing JMX as this would handle that and other context we include like the servlet, ganglia, whatever else we want to make) others to-do so for other things moving forward.
          Hide
          cburroughs Chris Burroughs added a comment -

          I also have metrics all over the place in production as well.

          The scala compatibility problem only exists if we use metrics-scala, which is just a pretty wrapper around the pure java stuff http://metrics.codahale.com/manual/scala/. If we don't use that all of the dependency complications go away. Is using the java interface palatable?

          For mx4j we made it optional on the classpath. We can follow the same pattern with metrics reporters to minimize dependency complications for kafak-as-library.

          Off Topic: Json is totally human readable https://addons.mozilla.org/EN-us/firefox/addon/jsonview/

          Show
          cburroughs Chris Burroughs added a comment - I also have metrics all over the place in production as well. The scala compatibility problem only exists if we use metrics-scala, which is just a pretty wrapper around the pure java stuff http://metrics.codahale.com/manual/scala/ . If we don't use that all of the dependency complications go away. Is using the java interface palatable? For mx4j we made it optional on the classpath. We can follow the same pattern with metrics reporters to minimize dependency complications for kafak-as-library. Off Topic: Json is totally human readable https://addons.mozilla.org/EN-us/firefox/addon/jsonview/
          Hide
          nehanarkhede Neha Narkhede added a comment -

          This probably is unblocked by KAFKA-385. It will be good to convert all existing Kafka metrics to the new metrics package

          Show
          nehanarkhede Neha Narkhede added a comment - This probably is unblocked by KAFKA-385 . It will be good to convert all existing Kafka metrics to the new metrics package
          Hide
          nehanarkhede Neha Narkhede added a comment -

          This ticket blocks metrics collection and graphing.

          Show
          nehanarkhede Neha Narkhede added a comment - This ticket blocks metrics collection and graphing.
          Hide
          junrao Jun Rao added a comment -

          I propose that we add/keep the following set of metrics. Anything missed?

          Server side:
          A. Requests:
          A1. produceRequestRate (meter, total)
          A2. fetchRequestRate (meter, follower/non-follower)
          A3. getMetadataRate (meter, total)
          A4. getOffsetRate (meter, total)
          A5. leaderAndISRRate (meter, total)
          A6. stopReplicaRate (meter, total)
          A7. produceRequestSizeHist (hist, total)
          A8. fetchResponseSizeHist (hist, total)
          A9. produceFailureRate (meter, topic/total)
          A10. fetchFailureRate (meter, topic/total)
          A11. produceRequestTime (timer, total)
          A12. fetchRequestTime (timer, total)
          A13. messagesInRate (meter, topic/total)
          A14. messagesOutRate (meter, topic/total)
          A15. messagesBytesInRate (meter, topic/total)
          A16. messagesBytesOutRate (meter, topic/total)

          B. Log:
          B1. logFlushTime (timer, total)

          C. Purgatory:
          Produce:
          C1. expiredRequestMeter (meter, partition/total)
          C2. satisfactionTimeHist (hist, total)

          Fetch:
          C3. expiredRequestMeter (meter, follower/non-follower)
          C4. satisfactionTimeHist (hist, follower/non-follower)

          Both:
          C5. delayedRequests (gauge, Fetch/Produce)

          D. ReplicaManager:
          D1. leaderPartitionCounts (gauge, total)
          D2. underReplicatedPartitionCounts (|ISR| < replication factor, gauge, total)
          D3. ISRExpandRate (meter, partition/total)
          D4. ISRShrinkRate (meter, partition/total)

          E. Controller:
          E1. requestRate (meter, total)
          E2. requestTimeHist (hist, total)
          E3. controllerActiveCount (gauge, total)

          Clients:
          F. Producer:
          F1. messageRate (meter, topic/total)
          F2. byteRate (meter, topic/total)
          F3. droppedEventRate (meter, total)
          F4. requestRate (meter, total)
          F5. requestSizeHist (hist, total)
          F6. requestTimeHist (hist, total)
          F7. resendRate (meter, total)
          F8. failedSendRate (meter, total)
          F9. getMetadataRate (meter, total)

          G. Consumer:
          G1. messageRate (meter, topic/total)
          G2. byteRate (meter, topic/total)
          G3. requestRate (meter, total)
          G4. requestSizeHist (hist, total)
          G5. requestTimeHist (hist, total)
          G6. lagInBytes (gauge, partition)

          Also, I propose that we remove the following metrics since they are either not very useful or are redundant.
          Purgatory:
          Produce:

          • caughtUpFollowerFetchRequest (meter, partition/total): not very useful
          • followerCatchupTime (hist, total): not very useful
          • throughputMeter (meter, partition/total): same as bytesIn
          • satisfiedRequestMeter (meter, total): not very useful

          Fetch:

          • satisfiedRequestMeter (meter, total): not very useful
          • throughputMeter (meter, partition/total): same as bytesOut

          Both

          • satisfactionRate (meter, Fetch/Produce): not very useful
          • expirationRate (meter, Fetch/Produce/topic): already at Produce/Fetch leve
          Show
          junrao Jun Rao added a comment - I propose that we add/keep the following set of metrics. Anything missed? Server side: A. Requests: A1. produceRequestRate (meter, total) A2. fetchRequestRate (meter, follower/non-follower) A3. getMetadataRate (meter, total) A4. getOffsetRate (meter, total) A5. leaderAndISRRate (meter, total) A6. stopReplicaRate (meter, total) A7. produceRequestSizeHist (hist, total) A8. fetchResponseSizeHist (hist, total) A9. produceFailureRate (meter, topic/total) A10. fetchFailureRate (meter, topic/total) A11. produceRequestTime (timer, total) A12. fetchRequestTime (timer, total) A13. messagesInRate (meter, topic/total) A14. messagesOutRate (meter, topic/total) A15. messagesBytesInRate (meter, topic/total) A16. messagesBytesOutRate (meter, topic/total) B. Log: B1. logFlushTime (timer, total) C. Purgatory: Produce: C1. expiredRequestMeter (meter, partition/total) C2. satisfactionTimeHist (hist, total) Fetch: C3. expiredRequestMeter (meter, follower/non-follower) C4. satisfactionTimeHist (hist, follower/non-follower) Both: C5. delayedRequests (gauge, Fetch/Produce) D. ReplicaManager: D1. leaderPartitionCounts (gauge, total) D2. underReplicatedPartitionCounts (|ISR| < replication factor, gauge, total) D3. ISRExpandRate (meter, partition/total) D4. ISRShrinkRate (meter, partition/total) E. Controller: E1. requestRate (meter, total) E2. requestTimeHist (hist, total) E3. controllerActiveCount (gauge, total) Clients: F. Producer: F1. messageRate (meter, topic/total) F2. byteRate (meter, topic/total) F3. droppedEventRate (meter, total) F4. requestRate (meter, total) F5. requestSizeHist (hist, total) F6. requestTimeHist (hist, total) F7. resendRate (meter, total) F8. failedSendRate (meter, total) F9. getMetadataRate (meter, total) G. Consumer: G1. messageRate (meter, topic/total) G2. byteRate (meter, topic/total) G3. requestRate (meter, total) G4. requestSizeHist (hist, total) G5. requestTimeHist (hist, total) G6. lagInBytes (gauge, partition) Also, I propose that we remove the following metrics since they are either not very useful or are redundant. Purgatory: Produce: caughtUpFollowerFetchRequest (meter, partition/total): not very useful followerCatchupTime (hist, total): not very useful throughputMeter (meter, partition/total): same as bytesIn satisfiedRequestMeter (meter, total): not very useful Fetch: satisfiedRequestMeter (meter, total): not very useful throughputMeter (meter, partition/total): same as bytesOut Both satisfactionRate (meter, Fetch/Produce): not very useful expirationRate (meter, Fetch/Produce/topic): already at Produce/Fetch leve
          Hide
          jkreps Jay Kreps added a comment -

          There are some details around how we track request-level stats. The idea was to begin and end request measurement at the socket server so it includes the full lifecycle. Each request would have the following phases:
          read time - time spent reading data off the network
          queue time - time spent waiting for a processing thread to handle the request
          api time - time spent in the api layer
          queue time - time spent waiting for a network thread to handle the request
          write time - time spent writing data (which would include the sendfile time)

          The implementation could just be to add to the Request object we have so that it has something like
          request.begin("read")
          // do some reading
          request.end("read")

          It would be nice to have these stats at the client id level, but there are two problems:
          1. That would be a lot of histogram data
          2. Currently the socket server is not aware of the client id.

          So I recommend we just track this info on a per-api basis for now. We can revisit in the future to get more in-depth instrumentation.

          Show
          jkreps Jay Kreps added a comment - There are some details around how we track request-level stats. The idea was to begin and end request measurement at the socket server so it includes the full lifecycle. Each request would have the following phases: read time - time spent reading data off the network queue time - time spent waiting for a processing thread to handle the request api time - time spent in the api layer queue time - time spent waiting for a network thread to handle the request write time - time spent writing data (which would include the sendfile time) The implementation could just be to add to the Request object we have so that it has something like request.begin("read") // do some reading request.end("read") It would be nice to have these stats at the client id level, but there are two problems: 1. That would be a lot of histogram data 2. Currently the socket server is not aware of the client id. So I recommend we just track this info on a per-api basis for now. We can revisit in the future to get more in-depth instrumentation.
          Hide
          junrao Jun Rao added a comment -

          Attach patch v1.

          Patch overview:
          1. Added general support to collect time breakdown (queueTime, localTime, remoteTime, sendTime, totalTime) for all types of requests. Need to refactor RequestChannel.Request a bit to include deserialized request object.
          2. removed some metrics in DelayedRequestMetrics since they are now covered by #1.
          3. Fixed Pool.getMaybePut() to make sure that the new object is only created once.
          4. Converted all existing metrics to use coda hale and added some new metrics.

          The list of new and converted metrics is the following.
          Server side:
          A. Requests
          for each request type:
          A1. requestRate (meter, total)
          A2. queueTime (hist, total)
          A3. localTime (hist, total)
          A4. remoteTime (hist, total)
          A5. sendTime (hist, total)
          A6. totalTime (hist, total)
          For Fetch/Produce
          A7. produceFailureRate (meter, topic/total)
          A8. fetchFailureRate (meter, topic/total)
          A9. messagesInRate (meter, topic/total)
          A10. messagesOutRate (meter, topic/total)
          A11. messagesBytesInRate (meter, topic/total)
          A12. messagesBytesOutRate (meter, topic/total)
          All
          A13. requestQueueSize (gauge, total)

          B. Log:
          B1. logFlushTime (timer, total)
          B2. logSegments (gauge, per log)
          B3. logEndOffset (gauge, per log)

          C. Purgatory:
          Produce:
          C1. expiredRequestMeter (meter, partition/total)

          Fetch:
          C2. expiredRequestMeter (meter, follower/non-follower)

          Both:
          C3. delayedRequests (gauge, Fetch/Produce)

          D. ReplicaManager:
          D1. leaderPartitionCounts (gauge, total)
          D2. underReplicatedPartitionCounts (|ISR| < replication factor, gauge, total)
          D3. ISRExpandRate (meter, partition/total)
          D4. ISRShrinkRate (meter, partition/total)

          E. Controller:
          E1. controllerActiveCount (gauge, total)

          Clients:
          F. Producer:
          F1. messageRate (meter, topic/total)
          F2. byteRate (meter, topic/total)
          F3. droppedEventRate (meter, total)
          F4. producerQueueSize (gauge, per send thread)
          F5. requestSizeHist (hist, total)
          F6. requestTimeAndRate (timer, total)
          F7. resendRate (meter, total)
          F8. failedSendRate (meter, total)

          G. Consumer:
          G1. messageRate (meter, topic/total)
          G2. byteRate (meter, topic/total)
          G3. requestSizeHist (hist, total)
          G4. requestTimeAndRate (timer, total)
          G5. lagInBytes (gauge, partition)

          Show
          junrao Jun Rao added a comment - Attach patch v1. Patch overview: 1. Added general support to collect time breakdown (queueTime, localTime, remoteTime, sendTime, totalTime) for all types of requests. Need to refactor RequestChannel.Request a bit to include deserialized request object. 2. removed some metrics in DelayedRequestMetrics since they are now covered by #1. 3. Fixed Pool.getMaybePut() to make sure that the new object is only created once. 4. Converted all existing metrics to use coda hale and added some new metrics. The list of new and converted metrics is the following. Server side: A. Requests for each request type: A1. requestRate (meter, total) A2. queueTime (hist, total) A3. localTime (hist, total) A4. remoteTime (hist, total) A5. sendTime (hist, total) A6. totalTime (hist, total) For Fetch/Produce A7. produceFailureRate (meter, topic/total) A8. fetchFailureRate (meter, topic/total) A9. messagesInRate (meter, topic/total) A10. messagesOutRate (meter, topic/total) A11. messagesBytesInRate (meter, topic/total) A12. messagesBytesOutRate (meter, topic/total) All A13. requestQueueSize (gauge, total) B. Log: B1. logFlushTime (timer, total) B2. logSegments (gauge, per log) B3. logEndOffset (gauge, per log) C. Purgatory: Produce: C1. expiredRequestMeter (meter, partition/total) Fetch: C2. expiredRequestMeter (meter, follower/non-follower) Both: C3. delayedRequests (gauge, Fetch/Produce) D. ReplicaManager: D1. leaderPartitionCounts (gauge, total) D2. underReplicatedPartitionCounts (|ISR| < replication factor, gauge, total) D3. ISRExpandRate (meter, partition/total) D4. ISRShrinkRate (meter, partition/total) E. Controller: E1. controllerActiveCount (gauge, total) Clients: F. Producer: F1. messageRate (meter, topic/total) F2. byteRate (meter, topic/total) F3. droppedEventRate (meter, total) F4. producerQueueSize (gauge, per send thread) F5. requestSizeHist (hist, total) F6. requestTimeAndRate (timer, total) F7. resendRate (meter, total) F8. failedSendRate (meter, total) G. Consumer: G1. messageRate (meter, topic/total) G2. byteRate (meter, topic/total) G3. requestSizeHist (hist, total) G4. requestTimeAndRate (timer, total) G5. lagInBytes (gauge, partition)
          Hide
          jjkoshy Joel Koshy added a comment -

          I think this patch looks great and this list of stats is a good start. I
          have some minor comments:

          1) Rebase - the latest patch applies cleanly to r1378264.

          2) The following are just my preferences on naming. What you have should be
          fine, but we should make sure the stat names are as intuitive as
          possible. We should come up with a naming convention for stats and add it
          to our coding convention.

          a) Some timer stats may be better named. E.g., SimpleConsumer
          ConsumerRequestTime will include both request rate and request duration
          which is not very intuitive. OTOH I'm having trouble thinking of a naming
          convention: I would suggest just ConsumerRequestStats - but the size stat
          would be outside then.

          b) Partition.scala:
          ISRExpandRate -> ISRExpandEventRate
          ISRShrinkRate -> ISRShrinkEventRate

          c) Log.scala:
          "LogSegments" -> "NumLogSegments"

          d) ConsumerTopicStat.scala:
          "Total" -> "AllTopics" Also, what if there's a topic called "Total"?
          We may want to name this label such that it is an illegal topic
          name (KAFKA-495) - say, "All/Topics".

          e) SimpleConsumer.scala:
          "ConsumerRequestTime" -> see above.

          f) FileMessageSet.scala:
          "LogFlush" -> "LogFlushStats"

          g) RequestChannel.scala:

          i) Instead of "regular" and "follower" how about "consumer" and
          "replica"?

          ii) endRequestTracking -> updateRequestMetrics

          iii) responseComplet (typo)

          iv) For timing stats, may be better to include the unit as part of the
          metric names (e.g., TotalTimeNs).

          v) SendTime -> ResponseSendTime(Ns)

          vi) May be useful to add a comment that simply lays out the phases to
          make the code clearer:
          /* received (start time) -> in queue (queue time) -> dequeued for
          api-local processing -> [api remote processing] -> send response */

          h) AsyncProducerStats.scala:
          DroppedEvent -> DroppedEventsPerSec
          Resentevent -> ResendEventsPerSec
          resents -> resends
          FailedSend -> FailedSendsPerSec
          (or maybe we should just follow a convention: <stat>Rate which
          defaults to <stat> per sec)
          FailedSendtRate (typo)

          i) KafkaApis.scala
          byteInRate -> bytesInRate; byteOutRate -> bytesOutRate
          ExpiresPerSecond -> ExpirationsPerSec

          j) KafkaRequestHandlers.scala
          MessageInPerSec -> IncomingMessagesPerSec

          3) There are some places (SimpleConsumer, FileMessageSet, SyncProducer)
          where you use metrics timers. Instead of this:

          val timer = newTimer(...)
          ...
          val ctx = timer.time()
          try

          { // do something }
          finally { ctx.stop() }

          You can use the following equivalent pattern:
          val timer = new KafkaTimer(underlying)
          timer.time { // do something }

          4) ZookeeperConsumerConnector: These JMX operations are actually useful to
          consumers right?

          5) DefaultEventHandler: should byte rate be updated here or only after
          sending? Although it does seem useful to have the global byte rate even
          for those that are subsequently dropped.

          6) SyncProducer.scala: use KafkaTimer. Also, same comment on naming for
          timers described above.

          7) AbstractFetcherThread.scala: FetcherLagMetrics.lock unused.

          8) KafkaApis.scala:
          a) Line 108 unused
          b) One caveat in removing the per key ProducerRequestPurgatory stats is
          if there is a key that has an intermittently slow follower you won't be
          able to narrow it down very easily (since the entire request will
          expire). OTOH you will have that stat available from the follower - it's
          just that you will need to "search" for the follower that is causing the
          expirations. So I think it's fine to remove it as it makes the code a lot
          simpler.

          9) Pool.scala: good idea.

          Show
          jjkoshy Joel Koshy added a comment - I think this patch looks great and this list of stats is a good start. I have some minor comments: 1) Rebase - the latest patch applies cleanly to r1378264. 2) The following are just my preferences on naming. What you have should be fine, but we should make sure the stat names are as intuitive as possible. We should come up with a naming convention for stats and add it to our coding convention. a) Some timer stats may be better named. E.g., SimpleConsumer ConsumerRequestTime will include both request rate and request duration which is not very intuitive. OTOH I'm having trouble thinking of a naming convention: I would suggest just ConsumerRequestStats - but the size stat would be outside then. b) Partition.scala: ISRExpandRate -> ISRExpandEventRate ISRShrinkRate -> ISRShrinkEventRate c) Log.scala: "LogSegments" -> "NumLogSegments" d) ConsumerTopicStat.scala: "Total" -> "AllTopics" Also, what if there's a topic called "Total"? We may want to name this label such that it is an illegal topic name ( KAFKA-495 ) - say, "All/Topics". e) SimpleConsumer.scala: "ConsumerRequestTime" -> see above. f) FileMessageSet.scala: "LogFlush" -> "LogFlushStats" g) RequestChannel.scala: i) Instead of "regular" and "follower" how about "consumer" and "replica"? ii) endRequestTracking -> updateRequestMetrics iii) responseComplet (typo) iv) For timing stats, may be better to include the unit as part of the metric names (e.g., TotalTimeNs). v) SendTime -> ResponseSendTime(Ns) vi) May be useful to add a comment that simply lays out the phases to make the code clearer: /* received (start time) -> in queue (queue time) -> dequeued for api-local processing -> [api remote processing] -> send response */ h) AsyncProducerStats.scala: DroppedEvent -> DroppedEventsPerSec Resentevent -> ResendEventsPerSec resents -> resends FailedSend -> FailedSendsPerSec (or maybe we should just follow a convention: <stat>Rate which defaults to <stat> per sec) FailedSendtRate (typo) i) KafkaApis.scala byteInRate -> bytesInRate; byteOutRate -> bytesOutRate ExpiresPerSecond -> ExpirationsPerSec j) KafkaRequestHandlers.scala MessageInPerSec -> IncomingMessagesPerSec 3) There are some places (SimpleConsumer, FileMessageSet, SyncProducer) where you use metrics timers. Instead of this: val timer = newTimer(...) ... val ctx = timer.time() try { // do something } finally { ctx.stop() } You can use the following equivalent pattern: val timer = new KafkaTimer(underlying) timer.time { // do something } 4) ZookeeperConsumerConnector: These JMX operations are actually useful to consumers right? 5) DefaultEventHandler: should byte rate be updated here or only after sending? Although it does seem useful to have the global byte rate even for those that are subsequently dropped. 6) SyncProducer.scala: use KafkaTimer. Also, same comment on naming for timers described above. 7) AbstractFetcherThread.scala: FetcherLagMetrics.lock unused. 8) KafkaApis.scala: a) Line 108 unused b) One caveat in removing the per key ProducerRequestPurgatory stats is if there is a key that has an intermittently slow follower you won't be able to narrow it down very easily (since the entire request will expire). OTOH you will have that stat available from the follower - it's just that you will need to "search" for the follower that is causing the expirations. So I think it's fine to remove it as it makes the code a lot simpler. 9) Pool.scala: good idea.
          Hide
          nehanarkhede Neha Narkhede added a comment -

          It's great to see a patch that fixes metrics.

          1. Partition
          In the isUnderReplicated API, shouldn't the in sync replicas size be compared to the replication factor for that partition and not the default replciation factor ?

          2. ZookeeperConsumerConnector
          There are a bunch of interesting metrics here that are very useful while troubleshooting. For example,
          2.1 Queue size per topic and consumer thread id: When the consumer client's processing slows down, the consumer's queues back up. Currently, to troubleshoot this issue, we need to take thread dumps. If we had the right monitoring on the
          consumer, we could just look at the metrics to figure out the problem.
          2.2 Fetch requests per second per fetcher: Useful to know the progress of the fetcher thread. In this, the bean might probably be named after the broker id that the fetcher is connected to, somewhere along the lines of per key purgatory metrics.

          3. KafkaController
          3.1 We need a way to tell if a partition is offline. If all replicas of a partition go offline, no leader can be elected for that partition and an alert would have to be raised.
          3.2 We also need to be able to measure -
          3.2.1 leader election latency
          3.2.2 Leader election rate

          4. ReplicaManager
          4.1 Rename ISRExpandRate to isrExpandRate
          4.2 Rename ISRShrinkRate to isrShrinkRate
          4.3 I'm not sure how useful it is to have a count for leaders and under replicated partitions. We however, do need a per partition status that tells if the partition is offline or under replicated.

          5. TopicMetadataTest
          Wrap the long line

          6. system_test
          We have to add the new metrics to the metrics.json file to that we can view the metrics on every test run. Not sure if you want to push that to a separate JIRA or not ?

          Show
          nehanarkhede Neha Narkhede added a comment - It's great to see a patch that fixes metrics. 1. Partition In the isUnderReplicated API, shouldn't the in sync replicas size be compared to the replication factor for that partition and not the default replciation factor ? 2. ZookeeperConsumerConnector There are a bunch of interesting metrics here that are very useful while troubleshooting. For example, 2.1 Queue size per topic and consumer thread id: When the consumer client's processing slows down, the consumer's queues back up. Currently, to troubleshoot this issue, we need to take thread dumps. If we had the right monitoring on the consumer, we could just look at the metrics to figure out the problem. 2.2 Fetch requests per second per fetcher: Useful to know the progress of the fetcher thread. In this, the bean might probably be named after the broker id that the fetcher is connected to, somewhere along the lines of per key purgatory metrics. 3. KafkaController 3.1 We need a way to tell if a partition is offline. If all replicas of a partition go offline, no leader can be elected for that partition and an alert would have to be raised. 3.2 We also need to be able to measure - 3.2.1 leader election latency 3.2.2 Leader election rate 4. ReplicaManager 4.1 Rename ISRExpandRate to isrExpandRate 4.2 Rename ISRShrinkRate to isrShrinkRate 4.3 I'm not sure how useful it is to have a count for leaders and under replicated partitions. We however, do need a per partition status that tells if the partition is offline or under replicated. 5. TopicMetadataTest Wrap the long line 6. system_test We have to add the new metrics to the metrics.json file to that we can view the metrics on every test run. Not sure if you want to push that to a separate JIRA or not ?
          Hide
          junrao Jun Rao added a comment -

          Attache patch v2. Overview of additional changes.

          1. rebased.

          2. DefaultEventHandler: Added a metric for serlializationErrorRate. Also changed the serialization error handling a bit depending on whether the send is sync or async.

          3. Use the following convention to distinguish AllTopic metrics data and the per topic one: for metrics X, AllTopics => AllTopicsX; topic Y => Y-X.

          4. Made a pass of metrics names and tried to keep them consistent.

          5. Updated metrics.json with the new metrics. Right now we start a separate jmx tool for each jmx bean. Too many beans will slow down the system test. So, I only exposed a subset of the metrics. Once the jmx tool issue is resolved, we can add more beans for collection and graphing.

          Review comments:
          Joel:
          4) Consumer lag is the most useful metrics, on which an alert can be set. LogEndOffset and ConsumerOffset are less useful and can be obtained from tools.

          5) The problem is that we only know the size of a message after it is serialized, since message itself can be of any type.

          8) If a follower is slow, it will be eventually dropped out of ISR and we have metrics on both ISRShrink rate and underreplicated partitions to track this.

          Neha:
          1. For that, the broker needs to know the replication factor of a partition. This information needs to be sent from broker on LeaderAndISRRequest. I will leave that in kafka-340.

          4.3 LeaderCount is useful to see if client loads are balanced among brokers and a global "under replicated partition count" is convenient for setting up a alert (otherwise, one has to do that on each partition).

          The rest of the review comments are all addressed.

          Show
          junrao Jun Rao added a comment - Attache patch v2. Overview of additional changes. 1. rebased. 2. DefaultEventHandler: Added a metric for serlializationErrorRate. Also changed the serialization error handling a bit depending on whether the send is sync or async. 3. Use the following convention to distinguish AllTopic metrics data and the per topic one: for metrics X, AllTopics => AllTopicsX; topic Y => Y-X. 4. Made a pass of metrics names and tried to keep them consistent. 5. Updated metrics.json with the new metrics. Right now we start a separate jmx tool for each jmx bean. Too many beans will slow down the system test. So, I only exposed a subset of the metrics. Once the jmx tool issue is resolved, we can add more beans for collection and graphing. Review comments: Joel: 4) Consumer lag is the most useful metrics, on which an alert can be set. LogEndOffset and ConsumerOffset are less useful and can be obtained from tools. 5) The problem is that we only know the size of a message after it is serialized, since message itself can be of any type. 8) If a follower is slow, it will be eventually dropped out of ISR and we have metrics on both ISRShrink rate and underreplicated partitions to track this. Neha: 1. For that, the broker needs to know the replication factor of a partition. This information needs to be sent from broker on LeaderAndISRRequest. I will leave that in kafka-340. 4.3 LeaderCount is useful to see if client loads are balanced among brokers and a global "under replicated partition count" is convenient for setting up a alert (otherwise, one has to do that on each partition). The rest of the review comments are all addressed.
          Hide
          jjkoshy Joel Koshy added a comment -

          +1 on v2.

          The following are all minor comments.

          2.1 - For the TODO in Partition.scala, can you add a comment in the jira
          where it will be addressed (or create a separate jira) so we don't lose
          track of it.

          2.2 - SimpleConsumer.scala: FetchRequestRateAndTimeMs ->
          FetchRequestRateAndDurationMs. Similar edits in FileMessageSet.scala for
          LogFlushRateAndTimeMs; SyncProducer.scala for
          ProduceRequestRateAndTimeMs

          2.3 - ProducerTopicStat.scala: resents -> resends

          2.4 - DefaultEventHandler.scala:
          a - val isSync = "sync".equals(config.producerType)
          b - So exceptions are no longer thrown for async producers on a
          serialization error. This will have an impact on KAFKA-496. Can you
          add a comment there after check-in?

          2.5 - ReplicaManager: the meter name is inconsistent with the convention
          used elsewhere.

          Show
          jjkoshy Joel Koshy added a comment - +1 on v2. The following are all minor comments. 2.1 - For the TODO in Partition.scala, can you add a comment in the jira where it will be addressed (or create a separate jira) so we don't lose track of it. 2.2 - SimpleConsumer.scala: FetchRequestRateAndTimeMs -> FetchRequestRateAndDurationMs. Similar edits in FileMessageSet.scala for LogFlushRateAndTimeMs; SyncProducer.scala for ProduceRequestRateAndTimeMs 2.3 - ProducerTopicStat.scala: resents -> resends 2.4 - DefaultEventHandler.scala: a - val isSync = "sync".equals(config.producerType) b - So exceptions are no longer thrown for async producers on a serialization error. This will have an impact on KAFKA-496 . Can you add a comment there after check-in? 2.5 - ReplicaManager: the meter name is inconsistent with the convention used elsewhere.
          Hide
          junrao Jun Rao added a comment -

          Thanks for the review. Committed to 0.8.

          2.1 created kafka-510 to track it.

          2.2 left the name as it is since it's shorter

          Addressed the rest of the comments.

          Show
          junrao Jun Rao added a comment - Thanks for the review. Committed to 0.8. 2.1 created kafka-510 to track it. 2.2 left the name as it is since it's shorter Addressed the rest of the comments.

            People

            • Assignee:
              junrao Jun Rao
              Reporter:
              jkreps Jay Kreps
            • Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development