Kafka
  1. Kafka
  2. KAFKA-826

Make Kafka 0.8 depend on metrics 2.2.0 instead of 3.x

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core

      Description

      In order to mavenize Kafka 0.8, we have to depend on metrics 2.2.0 since metrics 3.x is a huge change as well as not an officially supported release.

      1. kafka-fix-for-826-take2.patch
        9 kB
        Dragos Manolescu
      2. kafka-fix-for-826-complete.patch
        15 kB
        Dragos Manolescu
      3. kafka-fix-for-826.patch
        9 kB
        Dragos Manolescu

        Issue Links

          Activity

          Show
          Cosmin Lehene added a comment - https://issues.apache.org/jira/browse/KAFKA-960
          Hide
          Jun Rao added a comment - - edited

          Could you file a new jira to track this?

          Show
          Jun Rao added a comment - - edited Could you file a new jira to track this?
          Hide
          Cosmin Lehene added a comment - - edited

          Now that metrics 3.0 has been released (http://metrics.codahale.com/about/release-notes/) perhaps we should consider upgrading it back?

          Show
          Cosmin Lehene added a comment - - edited Now that metrics 3.0 has been released ( http://metrics.codahale.com/about/release-notes/ ) perhaps we should consider upgrading it back?
          Hide
          Dragos Manolescu added a comment -

          Jun, I understand what you're after. The sbt-assembly task lists the JARs it packages together when executed:

          > assembly-package-dependency
          [info] Including metrics-core-2.2.0.jar
          [info] Including metrics-annotation-2.2.0.jar
          [info] Including snappy-java-1.0.4.1.jar
          [info] Including slf4j-api-1.7.2.jar
          [info] Including log4j-1.2.15.jar
          [info] Including zkclient-0.2.jar
          [info] Including scala-compiler.jar
          [info] Including zookeeper-3.3.4.jar
          [info] Including slf4j-simple-1.6.4.jar
          [info] Including scala-library.jar
          [info] Including jopt-simple-3.2.jar

          Following the execution of the update task these JARs will be available from the ivy cache (rather than the target folder, which if my understanding is correct holds only artifacts generated while compiling and building).

          I am not aware of a mechanism to have sbt-assembly place the dependencies in the target folder. As plugins like sbt-assembly, sbt-proguard, sbt-onejar and others indicate fat JARs are the typical packaging mechanism for releasing JVM bytecode.

          Show
          Dragos Manolescu added a comment - Jun, I understand what you're after. The sbt-assembly task lists the JARs it packages together when executed: > assembly-package-dependency [info] Including metrics-core-2.2.0.jar [info] Including metrics-annotation-2.2.0.jar [info] Including snappy-java-1.0.4.1.jar [info] Including slf4j-api-1.7.2.jar [info] Including log4j-1.2.15.jar [info] Including zkclient-0.2.jar [info] Including scala-compiler.jar [info] Including zookeeper-3.3.4.jar [info] Including slf4j-simple-1.6.4.jar [info] Including scala-library.jar [info] Including jopt-simple-3.2.jar Following the execution of the update task these JARs will be available from the ivy cache (rather than the target folder, which if my understanding is correct holds only artifacts generated while compiling and building). I am not aware of a mechanism to have sbt-assembly place the dependencies in the target folder. As plugins like sbt-assembly, sbt-proguard, sbt-onejar and others indicate fat JARs are the typical packaging mechanism for releasing JVM bytecode.
          Hide
          Jun Rao added a comment -

          Dragos,

          Thanks for the patch. It seems that you merged all dependant jars into a fat jar kafka-assembly-0.8-SNAPSHOT-deps.jar. Is it possible to keep individual jars in target/scala-2.8.0? This way, if people want to release the binary, it's clear for them what are the dependant jars and their version.

          Show
          Jun Rao added a comment - Dragos, Thanks for the patch. It seems that you merged all dependant jars into a fat jar kafka-assembly-0.8-SNAPSHOT-deps.jar. Is it possible to keep individual jars in target/scala-2.8.0? This way, if people want to release the binary, it's clear for them what are the dependant jars and their version.
          Hide
          Dragos Manolescu added a comment -

          You're welcome; thank you all for verifying independently.

          Show
          Dragos Manolescu added a comment - You're welcome; thank you all for verifying independently.
          Hide
          Neha Narkhede added a comment -

          Thanks a lot for the patches! +1 on the complete patch. In addition to that, I will check in a change that deletes core/lib

          Show
          Neha Narkhede added a comment - Thanks a lot for the patches! +1 on the complete patch. In addition to that, I will check in a change that deletes core/lib
          Hide
          Swapnil Ghike added a comment -

          Thanks Matt for verifying! This patch works well. Perhaps a committer who is familiar with sbt should take a look.

          Show
          Swapnil Ghike added a comment - Thanks Matt for verifying! This patch works well. Perhaps a committer who is familiar with sbt should take a look.
          Hide
          Matt Christiansen added a comment -

          I made my own branch and applyed this patch, in all my testing it seems like its doing great. I would also love to see it committed.

          Show
          Matt Christiansen added a comment - I made my own branch and applyed this patch, in all my testing it seems like its doing great. I would also love to see it committed.
          Hide
          Scott Carey added a comment -

          Can a committer look at this? This would be great to get in the 0.8 branch soon and looks good to me.
          I have a cleanup of sbt to significantly simplify it further that I'd like to do as part of KAFKA-854 but it will fail to merge with this change.

          Show
          Scott Carey added a comment - Can a committer look at this? This would be great to get in the 0.8 branch soon and looks good to me. I have a cleanup of sbt to significantly simplify it further that I'd like to do as part of KAFKA-854 but it will fail to merge with this change.
          Hide
          Dragos Manolescu added a comment -

          Hmm, it looks like for some reason the metrics JARs didn't end up on your CLASSPATH.

          Here's the same patch with one additional update that packages the required libraries into a single jar, kafka-assembly-0.8-SNAPSHOT-deps.jar. This change removes ALL references to ~/.ivy2/cache from the script and requires one additional step: execute the "assembly-package-dependency" sbt task (provided by the sbt-assembly plugin that was already included).

          Here are the steps:

          rm -rf ~/.ivy2/cache/
          find . -name target | xargs rm -rf
          bash sbt update
          bash sbt package
          bash sbt assembly-package-dependency # NEW STEP
          bin/zookeeper-server-start.sh config/zookeeper.properties
          bin/kafka-server-start.sh config/server.properties

          I verified and the server starts up fine. Please LMK if you run into problems.

          Here's the output from assembly-package-dependency:

          % bash sbt assembly-package-dependency
          [info] Loading global plugins from /Users/dragos.manolescu/.sbt/plugins
          [info] Loading project definition from /Users/dragos.manolescu/Repos/kafka/project
          [info] Set current project to Kafka (in build file:/Users/dragos.manolescu/Repos/kafka/)
          [info] Including metrics-core-2.2.0.jar
          [info] Including scala-compiler.jar
          [info] Including zkclient-0.2.jar
          [info] Including metrics-annotation-2.2.0.jar
          [info] Including snappy-java-1.0.4.1.jar
          [info] Including log4j-1.2.15.jar
          [info] Including slf4j-api-1.7.2.jar
          [info] Including zookeeper-3.3.4.jar
          [info] Including jopt-simple-3.2.jar
          [info] Including slf4j-simple-1.6.4.jar
          [info] Including scala-library.jar
          [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
          [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'
          [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE' with strategy 'rename'
          [warn] Merging 'LICENSE.txt' with strategy 'rename'
          [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
          [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
          [warn] Strategy 'discard' was applied to a file
          [warn] Strategy 'rename' was applied to 5 files
          [info] Packaging /Users/dragos.manolescu/Repos/kafka/core/target/scala-2.8.0/kafka-assembly-0.8-SNAPSHOT-deps.jar ...
          [info] Done packaging.
          [success] Total time: 44 s, completed Apr 8, 2013 10:58:33 AM

          Show
          Dragos Manolescu added a comment - Hmm, it looks like for some reason the metrics JARs didn't end up on your CLASSPATH. Here's the same patch with one additional update that packages the required libraries into a single jar, kafka-assembly-0.8-SNAPSHOT-deps.jar. This change removes ALL references to ~/.ivy2/cache from the script and requires one additional step: execute the "assembly-package-dependency" sbt task (provided by the sbt-assembly plugin that was already included). Here are the steps: rm -rf ~/.ivy2/cache/ find . -name target | xargs rm -rf bash sbt update bash sbt package bash sbt assembly-package-dependency # NEW STEP bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties I verified and the server starts up fine. Please LMK if you run into problems. Here's the output from assembly-package-dependency: % bash sbt assembly-package-dependency [info] Loading global plugins from /Users/dragos.manolescu/.sbt/plugins [info] Loading project definition from /Users/dragos.manolescu/Repos/kafka/project [info] Set current project to Kafka (in build file:/Users/dragos.manolescu/Repos/kafka/ ) [info] Including metrics-core-2.2.0.jar [info] Including scala-compiler.jar [info] Including zkclient-0.2.jar [info] Including metrics-annotation-2.2.0.jar [info] Including snappy-java-1.0.4.1.jar [info] Including log4j-1.2.15.jar [info] Including slf4j-api-1.7.2.jar [info] Including zookeeper-3.3.4.jar [info] Including jopt-simple-3.2.jar [info] Including slf4j-simple-1.6.4.jar [info] Including scala-library.jar [warn] Merging 'META-INF/NOTICE' with strategy 'rename' [warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename' [warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE' with strategy 'rename' [warn] Merging 'LICENSE.txt' with strategy 'rename' [warn] Merging 'META-INF/LICENSE' with strategy 'rename' [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard' [warn] Strategy 'discard' was applied to a file [warn] Strategy 'rename' was applied to 5 files [info] Packaging /Users/dragos.manolescu/Repos/kafka/core/target/scala-2.8.0/kafka-assembly-0.8-SNAPSHOT-deps.jar ... [info] Done packaging. [success] Total time: 44 s, completed Apr 8, 2013 10:58:33 AM
          Hide
          Swapnil Ghike added a comment - - edited

          Thanks, the unit tests were ok. But while trying to boot up a kafka server, I got the following error.

          This is what I did,
          rm -rf ~/.ivy2
          patch -p1 < kafka-fix-for-826-complete.patch
          git rm -r core/lib
          ./sbt clean
          ./sbt update
          ./sbt package
          bin/zookeeper-server-start.sh config/zookeeper.properties
          bin/kafka-server-start.sh config/server.properties

          [2013-04-08 00:48:29,743] FATAL (kafka.Kafka$)
          java.lang.NoClassDefFoundError: com/yammer/metrics/reporting/CsvReporter
          at kafka.metrics.KafkaCSVMetricsReporter.init(KafkaCSVMetricsReporter.scala:53)
          at kafka.metrics.KafkaMetricsReporter$$anonfun$startReporters$1.apply(KafkaMetricsReporter.scala:60)
          at kafka.metrics.KafkaMetricsReporter$$anonfun$startReporters$1.apply(KafkaMetricsReporter.scala:58)
          at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
          at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
          at kafka.metrics.KafkaMetricsReporter$.startReporters(KafkaMetricsReporter.scala:58)
          at kafka.Kafka$.main(Kafka.scala:36)
          at kafka.Kafka.main(Kafka.scala)
          Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.reporting.CsvReporter
          at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
          at java.security.AccessController.doPrivileged(Native Method)
          at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
          at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
          at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
          ... 8 more

          I am not currently sure what's causing this issue.

          Show
          Swapnil Ghike added a comment - - edited Thanks, the unit tests were ok. But while trying to boot up a kafka server, I got the following error. This is what I did, rm -rf ~/.ivy2 patch -p1 < kafka-fix-for-826-complete.patch git rm -r core/lib ./sbt clean ./sbt update ./sbt package bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties [2013-04-08 00:48:29,743] FATAL (kafka.Kafka$) java.lang.NoClassDefFoundError: com/yammer/metrics/reporting/CsvReporter at kafka.metrics.KafkaCSVMetricsReporter.init(KafkaCSVMetricsReporter.scala:53) at kafka.metrics.KafkaMetricsReporter$$anonfun$startReporters$1.apply(KafkaMetricsReporter.scala:60) at kafka.metrics.KafkaMetricsReporter$$anonfun$startReporters$1.apply(KafkaMetricsReporter.scala:58) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32) at kafka.metrics.KafkaMetricsReporter$.startReporters(KafkaMetricsReporter.scala:58) at kafka.Kafka$.main(Kafka.scala:36) at kafka.Kafka.main(Kafka.scala) Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.reporting.CsvReporter at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) ... 8 more I am not currently sure what's causing this issue.
          Hide
          Dragos Manolescu added a comment -

          Here's the patch for metrics 2.2.0 with code updates for the metrics API, (ii) build updates to reference zkclient 0.2 and metrics 2.2.0, and (iii) script updates with the appropriate classpaths. I have verified it and all tests pass:

          [info] Passed: : Total 175, Failed 0, Errors 0, Passed 175, Skipped 0
          [success] Total time: 166 s, completed Apr 6, 2013 4:19:27 PM

          Please LMK if you run into problems.

          Show
          Dragos Manolescu added a comment - Here's the patch for metrics 2.2.0 with code updates for the metrics API, (ii) build updates to reference zkclient 0.2 and metrics 2.2.0, and (iii) script updates with the appropriate classpaths. I have verified it and all tests pass: [info] Passed: : Total 175, Failed 0, Errors 0, Passed 175, Skipped 0 [success] Total time: 166 s, completed Apr 6, 2013 4:19:27 PM Please LMK if you run into problems.
          Hide
          Dragos Manolescu added a comment -

          I'm just starting to work on it, will try to finish it over the weekend.

          Show
          Dragos Manolescu added a comment - I'm just starting to work on it, will try to finish it over the weekend.
          Hide
          Swapnil Ghike added a comment -

          Dragos, it seems like we are headed towards agreeing on using 2.2.0. Do you mind including the review suggestions in your patch? If you are busy, I can finish it up too. Thanks.

          Show
          Swapnil Ghike added a comment - Dragos, it seems like we are headed towards agreeing on using 2.2.0. Do you mind including the review suggestions in your patch? If you are busy, I can finish it up too. Thanks.
          Hide
          Matt Christiansen added a comment -

          I also like this, allows this to be integrated in with our other metrics we are already collecting under 2.2 and report it all to ganglia.

          Show
          Matt Christiansen added a comment - I also like this, allows this to be integrated in with our other metrics we are already collecting under 2.2 and report it all to ganglia.
          Hide
          Scott Carey added a comment -

          Sounds good to me.

          Show
          Scott Carey added a comment - Sounds good to me.
          Hide
          Neha Narkhede added a comment -

          I agree with Jun that we should release Kafka 0.8 with metrics 2.x. Let's wait until metrics 3.x APIs are stable.

          Show
          Neha Narkhede added a comment - I agree with Jun that we should release Kafka 0.8 with metrics 2.x. Let's wait until metrics 3.x APIs are stable.
          Hide
          Dragos Manolescu added a comment -

          Thank you Jun for the analysis! From my perspective the approach you suggested is sound. FWIW a couple of weeks ago I "ported" the Kafka code to the latest (at the time) metrics 3.0.0 APIs. The code compiled and tests passed, but I don't know the extent that demonstrates that the instrumentation and reporting worked as it should. I'm bringing it up because I feel that once 3.0.0 is released the port is mechanical.

          Show
          Dragos Manolescu added a comment - Thank you Jun for the analysis! From my perspective the approach you suggested is sound. FWIW a couple of weeks ago I "ported" the Kafka code to the latest (at the time) metrics 3.0.0 APIs. The code compiled and tests passed, but I don't know the extent that demonstrates that the instrumentation and reporting worked as it should. I'm bringing it up because I feel that once 3.0.0 is released the port is mechanical.
          Hide
          Jun Rao added a comment -

          I pinged Coda and the following are the answers that I got.

          1. How stable is 3.0.0-Beta1?
          The code is actually less complicated than Metrics 2 and far better tested: no thread pools, no lifecycles, no static references, just simple objects.

          2. Is it true that 3.0.0-Beta1 has no classpath conflict with 2.x? In other words, can an application use both jars in the same JVM?
          Many of the classes have changed names and/or packages, but I didn't intend for there to be no classpath conflicts. You might be able to get away with it, but someone somewhere will step on something sharp, I'm sure.

          3. Will there be any api change btw 3.0.0-Beta1 and the final 3.0 release?
          Probably, yes.

          4. Were there any api changes btw 3.0.0-Beta1 and 3.0.0-SNAPSHOT (the one that we are currently using in Kafka 0.8)?
          I don't know which snapshot build you're using, but I would imagine there are a lot of changes.

          Based on the above, my feeling is that Kafka 0.8 release should use metrics 2.x since (1) it has been stable; (2) people who depend on 2.x have no risk in using Kafka 0.8; (3) the api of 3.x is likely going to change again. In a post 0.8 release, we can upgrade to metrics 3.x when the final version is released and is deemed stable. Any concern with this approach?

          Show
          Jun Rao added a comment - I pinged Coda and the following are the answers that I got. 1. How stable is 3.0.0-Beta1? The code is actually less complicated than Metrics 2 and far better tested: no thread pools, no lifecycles, no static references, just simple objects. 2. Is it true that 3.0.0-Beta1 has no classpath conflict with 2.x? In other words, can an application use both jars in the same JVM? Many of the classes have changed names and/or packages, but I didn't intend for there to be no classpath conflicts. You might be able to get away with it, but someone somewhere will step on something sharp, I'm sure. 3. Will there be any api change btw 3.0.0-Beta1 and the final 3.0 release? Probably, yes. 4. Were there any api changes btw 3.0.0-Beta1 and 3.0.0-SNAPSHOT (the one that we are currently using in Kafka 0.8)? I don't know which snapshot build you're using, but I would imagine there are a lot of changes. Based on the above, my feeling is that Kafka 0.8 release should use metrics 2.x since (1) it has been stable; (2) people who depend on 2.x have no risk in using Kafka 0.8; (3) the api of 3.x is likely going to change again. In a post 0.8 release, we can upgrade to metrics 3.x when the final version is released and is deemed stable. Any concern with this approach?
          Hide
          Maxime Brugidou added a comment - - edited

          Yes there has been some work and it breaks when I try to use 3.0.0-BETA1 from 0.8 branch. We need to have a MetricsRegistry (static or injected) and use the new register() method to add gauges/metrics. Also the core namespace has been removed

          Anyone working on a patch to get the BETA1 ? (and remove the core/lib/metrics*.jar files? )

          By the way I don't think we use metrics-annotation, just metrics-core right?

          Show
          Maxime Brugidou added a comment - - edited Yes there has been some work and it breaks when I try to use 3.0.0-BETA1 from 0.8 branch. We need to have a MetricsRegistry (static or injected) and use the new register() method to add gauges/metrics. Also the core namespace has been removed Anyone working on a patch to get the BETA1 ? (and remove the core/lib/metrics*.jar files? ) By the way I don't think we use metrics-annotation, just metrics-core right?
          Hide
          Scott Carey added a comment -

          Given the volume of work that has been done on metrics 3 between the snapshot on March 12 and the recent BETA1, I am comfortable with it – a full release is likely to be complete before Kafka 0.8 is through its beta.

          Show
          Scott Carey added a comment - Given the volume of work that has been done on metrics 3 between the snapshot on March 12 and the recent BETA1, I am comfortable with it – a full release is likely to be complete before Kafka 0.8 is through its beta.
          Hide
          Scott Carey added a comment -

          My main complaint prior to this was two-fold:

          • Early versions of metrics 3.x had classpath collisions with 2.2.x , so we would be unable to have both in the same application and we already use 2.2.
          • There was no official version of metrics 3.x published anywhere to consume, no roadmap, and the developer was MIA for months. Kafka would have had to publish their own artifact version which gets messy fast and would have likely api compatibility issues with any future final metrics 3.0.x release and therefore be very difficult to use kafka 0.8 and the final metrics 3.0 in the same application.

          If 3.x is stable enough API wise, I'd be fine with keeping kafka on it. I believe 3.x no longer collides with 2.2.x in a classpath, but have not tested that recently, meaning we could migrate to 3.x at our own pace and not have to time it to be in sync with use of Kafka 0.8.

          Show
          Scott Carey added a comment - My main complaint prior to this was two-fold: Early versions of metrics 3.x had classpath collisions with 2.2.x , so we would be unable to have both in the same application and we already use 2.2. There was no official version of metrics 3.x published anywhere to consume, no roadmap, and the developer was MIA for months. Kafka would have had to publish their own artifact version which gets messy fast and would have likely api compatibility issues with any future final metrics 3.0.x release and therefore be very difficult to use kafka 0.8 and the final metrics 3.0 in the same application. If 3.x is stable enough API wise, I'd be fine with keeping kafka on it. I believe 3.x no longer collides with 2.2.x in a classpath, but have not tested that recently, meaning we could migrate to 3.x at our own pace and not have to time it to be in sync with use of Kafka 0.8.
          Hide
          Otis Gospodnetic added a comment -

          Jun Rao - I'm on the ML for that metrics lib and a few weeks (1-2 months?) ago Coda wrote an email saying he was not happy with a number of things in 2.x that he completely reworked in 3.x. I don't recall the details, but I recall feeling like 3.x is the version I'd use if I had to pick between 2.x and 3.x. Unless Kafka 0.8 is going to be released within the next week or two, I'd personally go with 3.x, unless you are open moving from 2.x to 3.x in the near future in 0.8.1.

          Show
          Otis Gospodnetic added a comment - Jun Rao - I'm on the ML for that metrics lib and a few weeks (1-2 months?) ago Coda wrote an email saying he was not happy with a number of things in 2.x that he completely reworked in 3.x. I don't recall the details, but I recall feeling like 3.x is the version I'd use if I had to pick between 2.x and 3.x. Unless Kafka 0.8 is going to be released within the next week or two, I'd personally go with 3.x, unless you are open moving from 2.x to 3.x in the near future in 0.8.1.
          Hide
          Swapnil Ghike added a comment -

          Dragos, thanks for the input. We can definitely use your patch if we decide to downgrade metrics to 2.2.0. Could you please address the following comments and re-submit the patch?

          1. Could you move the metrics-core, metrics-annotations and zkclient library dependences to core/build.sbt? You will need to replace the old organization and version number of zkclient there with the one you added to KafkaBuild.
          2. Could you append the corresponding paths to class path in bin/kafka-run-class.sh? Also please remove core/lib/*.jar from class path since we are moving the three jars there to ivy.
          3. git rm core/lib for the same reason mentioned in 2.

          Apart from your patch, we will need to fix the csv reporter separately.

          Show
          Swapnil Ghike added a comment - Dragos, thanks for the input. We can definitely use your patch if we decide to downgrade metrics to 2.2.0. Could you please address the following comments and re-submit the patch? 1. Could you move the metrics-core, metrics-annotations and zkclient library dependences to core/build.sbt? You will need to replace the old organization and version number of zkclient there with the one you added to KafkaBuild. 2. Could you append the corresponding paths to class path in bin/kafka-run-class.sh? Also please remove core/lib/*.jar from class path since we are moving the three jars there to ivy. 3. git rm core/lib for the same reason mentioned in 2. Apart from your patch, we will need to fix the csv reporter separately.
          Hide
          Jun Rao added a comment -

          Otis,

          The reason that we want to downgrade metrics to 2.x is because quite a few people feel this is the most stable version that they can depend on. Do you know how stable is this beta release? Will there be any API or bean name change btw the final 3.x release and this beta release?

          In general, do people feel comfortable about using the metrics 3.0 beta release?

          Show
          Jun Rao added a comment - Otis, The reason that we want to downgrade metrics to 2.x is because quite a few people feel this is the most stable version that they can depend on. Do you know how stable is this beta release? Will there be any API or bean name change btw the final 3.x release and this beta release? In general, do people feel comfortable about using the metrics 3.0 beta release?
          Hide
          Otis Gospodnetic added a comment -

          Ah, what timing! Metrics 3.0.0 BETA1 has just been pushed to Maven Central with goodies:
          Coda Hale <coda.hale@gmail.com> Apr 01 02:02PM -0700

          Metrics 3.0.0-BETA1 is on its way to Maven Central! It includes the following changes:

          • Total overhaul of most of the core Metrics classes:
          • Metric names are now just dotted paths like com.example.Thing, allowing for very flexible scopes, etc.
          • Meters and timers no longer have rate or duration units; those are properties of reporters.
          • Reporter architecture has been radically simplified, fixing many bugs.
          • Histograms and timers can take arbitrary reservoir implementations.
          • Added sliding window reservoir implementations.
          • Added MetricSet for sets of metrics.
          • Changed package names to be OSGi-compatible and added OSGi bundling.
          • Extracted JVM instrumentation to metrics-jvm.
          • Extracted Jackson integration to metrics-json.
          • Removed metrics-guice, metrics-scala, and metrics-spring.
          • Renamed metrics-servlet to metrics-servlets.
          • Renamed metrics-web to metrics-servlet.
          • Renamed metrics-jetty to metrics-jetty8.
          • Many more small changes!

          Wouldn't it make sense to invest the effort into moving to 3.0.0 instead now? Otherwise, if the gap between 0.8 and post-0.8 release is as big as the gap between 0.7.2 and 0.8 it will be a while before moving to this new metrics package.

          Show
          Otis Gospodnetic added a comment - Ah, what timing! Metrics 3.0.0 BETA1 has just been pushed to Maven Central with goodies: Coda Hale <coda.hale@gmail.com> Apr 01 02:02PM -0700 Metrics 3.0.0-BETA1 is on its way to Maven Central! It includes the following changes: • Total overhaul of most of the core Metrics classes: • Metric names are now just dotted paths like com.example.Thing, allowing for very flexible scopes, etc. • Meters and timers no longer have rate or duration units; those are properties of reporters. • Reporter architecture has been radically simplified, fixing many bugs. • Histograms and timers can take arbitrary reservoir implementations. • Added sliding window reservoir implementations. • Added MetricSet for sets of metrics. • Changed package names to be OSGi-compatible and added OSGi bundling. • Extracted JVM instrumentation to metrics-jvm. • Extracted Jackson integration to metrics-json. • Removed metrics-guice, metrics-scala, and metrics-spring. • Renamed metrics-servlet to metrics-servlets. • Renamed metrics-web to metrics-servlet. • Renamed metrics-jetty to metrics-jetty8. • Many more small changes! Wouldn't it make sense to invest the effort into moving to 3.0.0 instead now? Otherwise, if the gap between 0.8 and post-0.8 release is as big as the gap between 0.7.2 and 0.8 it will be a while before moving to this new metrics package.
          Hide
          Swapnil Ghike added a comment -

          The take2 patch applies quite smoothly! Thanks a ton, will check today how it works out.

          Show
          Swapnil Ghike added a comment - The take2 patch applies quite smoothly! Thanks a ton, will check today how it works out.
          Hide
          Dragos Manolescu added a comment -

          Here it is, attached. Please LMK if you still have problems.

          Show
          Dragos Manolescu added a comment - Here it is, attached. Please LMK if you still have problems.
          Hide
          Swapnil Ghike added a comment -

          Hi Dragos, sorry for bugging you again. I think if I copy paste your new patch, I am not able to apply it again. Do you mind attaching it as a file? Thanks for the help Dragos!

          Show
          Swapnil Ghike added a comment - Hi Dragos, sorry for bugging you again. I think if I copy paste your new patch, I am not able to apply it again. Do you mind attaching it as a file? Thanks for the help Dragos!
          Hide
          Dragos Manolescu added a comment -

          I'm sorry you've had problems with this patch. You should be good to go:

          seac02jh0rjdkq4 ~/Repos/kafka% git status [92]

          1. On branch 0.8
          2. Untracked files:
          3. (use "git add <file>..." to include in what will be committed)
            #
          4. .idea_modules/
            nothing added to commit but untracked files present (use "git add" to track)
            mseac02jh0rjdkq4 ~/Repos/kafka% patch -p1 --dry-run < ../kafka-fix-for-826-take2.patch [93]
            patching file core/src/main/scala/kafka/cluster/Partition.scala
            patching file core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
            patching file core/src/main/scala/kafka/controller/KafkaController.scala
            patching file core/src/main/scala/kafka/log/Log.scala
            patching file core/src/main/scala/kafka/network/RequestChannel.scala
            patching file core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
            patching file core/src/main/scala/kafka/server/AbstractFetcherThread.scala
            patching file core/src/main/scala/kafka/server/ReplicaManager.scala
            patching file core/src/main/scala/kafka/server/RequestPurgatory.scala
            patching file core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
            patching file project/Build.scala
            patching file project/build/KafkaProject.scala
          Show
          Dragos Manolescu added a comment - I'm sorry you've had problems with this patch. You should be good to go: seac02jh0rjdkq4 ~/Repos/kafka% git status [92] On branch 0.8 Untracked files: (use "git add <file>..." to include in what will be committed) # .idea_modules/ nothing added to commit but untracked files present (use "git add" to track) mseac02jh0rjdkq4 ~/Repos/kafka% patch -p1 --dry-run < ../kafka-fix-for-826-take2.patch [93] patching file core/src/main/scala/kafka/cluster/Partition.scala patching file core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala patching file core/src/main/scala/kafka/controller/KafkaController.scala patching file core/src/main/scala/kafka/log/Log.scala patching file core/src/main/scala/kafka/network/RequestChannel.scala patching file core/src/main/scala/kafka/producer/async/ProducerSendThread.scala patching file core/src/main/scala/kafka/server/AbstractFetcherThread.scala patching file core/src/main/scala/kafka/server/ReplicaManager.scala patching file core/src/main/scala/kafka/server/RequestPurgatory.scala patching file core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala patching file project/Build.scala patching file project/build/KafkaProject.scala
          Hide
          Dragos Manolescu added a comment -

          diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
          index 2ca7ee6..e49bdae 100644
          — a/core/src/main/scala/kafka/cluster/Partition.scala
          +++ b/core/src/main/scala/kafka/cluster/Partition.scala
          @@ -60,7 +60,7 @@ class Partition(val topic: String,
          newGauge(
          topic + "-" + partitionId + "-UnderReplicated",
          new Gauge[Int] {

          • def getValue =
            Unknown macro: {+ def value = { if (isUnderReplicated) 1 else 0 } }

            diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
            index 9a5fbfe..398618f 100644

              • a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
                +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
                @@ -650,7 +650,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                newGauge(
                config.clientId + "" + config.groupId + "" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
                new Gauge[Int] { - def getValue = q.size + def value = q.size }

                )
                })
                diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
                index 74614d8..5f6eb3c 100644

              • a/core/src/main/scala/kafka/controller/KafkaController.scala
                +++ b/core/src/main/scala/kafka/controller/KafkaController.scala
                @@ -97,14 +97,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
                newGauge(
                "ActiveControllerCount",
                new Gauge[Int] { - def getValue() = if (isActive) 1 else 0 + def value() = if (isActive) 1 else 0 }

                )

          newGauge(
          "OfflinePartitionsCount",
          new Gauge[Int] {

          • def getValue: Int = {
            + def value(): Int = {
            controllerContext.controllerLock synchronized { controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader)) }

            diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
            index 7d71451..451775b 100644

              • a/core/src/main/scala/kafka/log/Log.scala
                +++ b/core/src/main/scala/kafka/log/Log.scala
                @@ -130,10 +130,10 @@ private[kafka] class Log(val dir: File,
                debug("Completed load of log %s with log end offset %d".format(name, logEndOffset))

          newGauge(name + "-" + "NumLogSegments",

          • new Gauge[Int] { def getValue = numberOfSegments }

            )
            + new Gauge[Int]

            { def value = numberOfSegments }

            )

          newGauge(name + "-" + "LogEndOffset",

          • new Gauge[Long] { def getValue = logEndOffset }

            )
            + new Gauge[Long]

            { def value = logEndOffset }

            )

          /* The name of this log */
          def name = dir.getName()
          diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
          index 209fdfa..c0e0dfc 100644
          — a/core/src/main/scala/kafka/network/RequestChannel.scala
          +++ b/core/src/main/scala/kafka/network/RequestChannel.scala
          @@ -99,7 +99,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
          newGauge(
          "RequestQueueSize",
          new Gauge[Int]

          { - def getValue = requestQueue.size + def value = requestQueue.size }

          )

          diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
          index 6691147..090400d 100644
          — a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
          +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
          @@ -36,7 +36,7 @@ class ProducerSendThread[K,V](val threadName: String,

          newGauge(clientId + "-ProducerQueueSize",
          new Gauge[Int]

          { - def getValue = queue.size + def value = queue.size }

          )

          override def run {
          diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
          index 087979f..2e026e6 100644
          — a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
          +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
          @@ -195,7 +195,7 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet
          newGauge(
          metricId + "-ConsumerLag",
          new Gauge[Long]

          { - def getValue = lagVal.get + def value = lagVal.get }

          )

          diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
          index 68e712c..44ad562 100644
          — a/core/src/main/scala/kafka/server/ReplicaManager.scala
          +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
          @@ -57,7 +57,7 @@ class ReplicaManager(val config: KafkaConfig,
          newGauge(
          "LeaderCount",
          new Gauge[Int] {

          • def getValue = {
            + def value = {
            leaderPartitionsLock synchronized { leaderPartitions.size }

            @@ -67,13 +67,13 @@ class ReplicaManager(val config: KafkaConfig,
            newGauge(
            "PartitionCount",
            new Gauge[Int]

            { - def getValue = allPartitions.size + def value = allPartitions.size }

            )
            newGauge(
            "UnderReplicatedPartitions",
            new Gauge[Int] {

          • def getValue = {
            + def value = {
            leaderPartitionsLock synchronized { leaderPartitions.count(_.isUnderReplicated) }

            diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
            index afe9e22..c064c5c 100644

              • a/core/src/main/scala/kafka/server/RequestPurgatory.scala
                +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
                @@ -72,14 +72,14 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
                newGauge(
                "PurgatorySize",
                new Gauge[Int] { - def getValue = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests + def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests }

                )

          newGauge(
          "NumDelayedRequests",
          new Gauge[Int]

          { - def getValue = expiredRequestReaper.unsatisfied.get() + def value = expiredRequestReaper.unsatisfied.get() }

          )

          diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
          index a3f85cf..fe5bc09 100644
          — a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
          +++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
          @@ -35,20 +35,20 @@ class KafkaTimerTest extends JUnit3Suite {
          timer.time

          { clock.addMillis(1000) }
          • assertEquals(1, metric.getCount())
          • assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon)
          • assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon)
            + assertEquals(1, metric.count())
            + assertTrue((metric.max() - 1000).abs <= Double.Epsilon)
            + assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
            }

          private class ManualClock extends Clock {

          private var ticksInNanos = 0L

          • override def getTick() = {
            + override def tick() = { ticksInNanos }
          • override def getTime() = {
            + override def time() = { TimeUnit.NANOSECONDS.toMillis(ticksInNanos) }

          diff --git a/project/Build.scala b/project/Build.scala
          index 4bbdfee..b8b476b 100644
          — a/project/Build.scala
          +++ b/project/Build.scala
          @@ -17,7 +17,6 @@

          import sbt._
          import Keys._
          -import java.io.File

          import scala.xml.

          {Node, Elem}

          import scala.xml.transform.

          {RewriteRule, RuleTransformer}

          @@ -34,7 +33,10 @@ object KafkaBuild extends Build {
          libraryDependencies ++= Seq(
          "log4j" % "log4j" % "1.2.15",
          "net.sf.jopt-simple" % "jopt-simple" % "3.2",

          • "org.slf4j" % "slf4j-simple" % "1.6.4"
            + "org.slf4j" % "slf4j-simple" % "1.6.4",
            + "com.101tec" % "zkclient" % "0.2",
            + "com.yammer.metrics" % "metrics-core" % "2.2.0",
            + "com.yammer.metrics" % "metrics-annotation" % "2.2.0"
            ),
            // The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
            // some dependencies on various sun and javax packages.
            diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala
            index fac723a..853a45c 100644
              • a/project/build/KafkaProject.scala
                +++ b/project/build/KafkaProject.scala
                @@ -74,7 +74,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
                <dependency>
                <groupId>com.yammer.metrics</groupId>
                <artifactId>metrics-core</artifactId>
          • <version>3.0.0-SNAPSHOT</version>
            + <version>2.2.0</version>
            <scope>compile</scope>
            </dependency>

          @@ -82,7 +82,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
          <dependency>
          <groupId>com.yammer.metrics</groupId>
          <artifactId>metrics-annotation</artifactId>

          • <version>3.0.0-SNAPSHOT</version>
            + <version>2.2.0</version>
            <scope>compile</scope>
            </dependency>
          Show
          Dragos Manolescu added a comment - diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 2ca7ee6..e49bdae 100644 — a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -60,7 +60,7 @@ class Partition(val topic: String, newGauge( topic + "-" + partitionId + "-UnderReplicated", new Gauge [Int] { def getValue = Unknown macro: {+ def value = { if (isUnderReplicated) 1 else 0 } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 9a5fbfe..398618f 100644 a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -650,7 +650,7 @@ private [kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, newGauge( config.clientId + " " + config.groupId + " " + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", new Gauge [Int] { - def getValue = q.size + def value = q.size } ) }) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 74614d8..5f6eb3c 100644 a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -97,14 +97,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg newGauge( "ActiveControllerCount", new Gauge [Int] { - def getValue() = if (isActive) 1 else 0 + def value() = if (isActive) 1 else 0 } ) newGauge( "OfflinePartitionsCount", new Gauge [Int] { def getValue: Int = { + def value(): Int = { controllerContext.controllerLock synchronized { controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader)) } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 7d71451..451775b 100644 a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -130,10 +130,10 @@ private [kafka] class Log(val dir: File, debug("Completed load of log %s with log end offset %d".format(name, logEndOffset)) newGauge(name + "-" + "NumLogSegments", new Gauge [Int] { def getValue = numberOfSegments } ) + new Gauge [Int] { def value = numberOfSegments } ) newGauge(name + "-" + "LogEndOffset", new Gauge [Long] { def getValue = logEndOffset } ) + new Gauge [Long] { def value = logEndOffset } ) /* The name of this log */ def name = dir.getName() diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 209fdfa..c0e0dfc 100644 — a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -99,7 +99,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe newGauge( "RequestQueueSize", new Gauge [Int] { - def getValue = requestQueue.size + def value = requestQueue.size } ) diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 6691147..090400d 100644 — a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -36,7 +36,7 @@ class ProducerSendThread [K,V] (val threadName: String, newGauge(clientId + "-ProducerQueueSize", new Gauge [Int] { - def getValue = queue.size + def value = queue.size } ) override def run { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 087979f..2e026e6 100644 — a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -195,7 +195,7 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet newGauge( metricId + "-ConsumerLag", new Gauge [Long] { - def getValue = lagVal.get + def value = lagVal.get } ) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 68e712c..44ad562 100644 — a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -57,7 +57,7 @@ class ReplicaManager(val config: KafkaConfig, newGauge( "LeaderCount", new Gauge [Int] { def getValue = { + def value = { leaderPartitionsLock synchronized { leaderPartitions.size } @@ -67,13 +67,13 @@ class ReplicaManager(val config: KafkaConfig, newGauge( "PartitionCount", new Gauge [Int] { - def getValue = allPartitions.size + def value = allPartitions.size } ) newGauge( "UnderReplicatedPartitions", new Gauge [Int] { def getValue = { + def value = { leaderPartitionsLock synchronized { leaderPartitions.count(_.isUnderReplicated) } diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index afe9e22..c064c5c 100644 a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -72,14 +72,14 @@ abstract class RequestPurgatory [T <: DelayedRequest, R] (brokerId: Int = 0, purge newGauge( "PurgatorySize", new Gauge [Int] { - def getValue = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests + def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests } ) newGauge( "NumDelayedRequests", new Gauge [Int] { - def getValue = expiredRequestReaper.unsatisfied.get() + def value = expiredRequestReaper.unsatisfied.get() } ) diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala index a3f85cf..fe5bc09 100644 — a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala @@ -35,20 +35,20 @@ class KafkaTimerTest extends JUnit3Suite { timer.time { clock.addMillis(1000) } assertEquals(1, metric.getCount()) assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon) assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon) + assertEquals(1, metric.count()) + assertTrue((metric.max() - 1000).abs <= Double.Epsilon) + assertTrue((metric.min() - 1000).abs <= Double.Epsilon) } private class ManualClock extends Clock { private var ticksInNanos = 0L override def getTick() = { + override def tick() = { ticksInNanos } override def getTime() = { + override def time() = { TimeUnit.NANOSECONDS.toMillis(ticksInNanos) } diff --git a/project/Build.scala b/project/Build.scala index 4bbdfee..b8b476b 100644 — a/project/Build.scala +++ b/project/Build.scala @@ -17,7 +17,6 @@ import sbt._ import Keys._ -import java.io.File import scala.xml. {Node, Elem} import scala.xml.transform. {RewriteRule, RuleTransformer} @@ -34,7 +33,10 @@ object KafkaBuild extends Build { libraryDependencies ++= Seq( "log4j" % "log4j" % "1.2.15", "net.sf.jopt-simple" % "jopt-simple" % "3.2", "org.slf4j" % "slf4j-simple" % "1.6.4" + "org.slf4j" % "slf4j-simple" % "1.6.4", + "com.101tec" % "zkclient" % "0.2", + "com.yammer.metrics" % "metrics-core" % "2.2.0", + "com.yammer.metrics" % "metrics-annotation" % "2.2.0" ), // The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required // some dependencies on various sun and javax packages. diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala index fac723a..853a45c 100644 a/project/build/KafkaProject.scala +++ b/project/build/KafkaProject.scala @@ -74,7 +74,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>3.0.0-SNAPSHOT</version> + <version>2.2.0</version> <scope>compile</scope> </dependency> @@ -82,7 +82,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-annotation</artifactId> <version>3.0.0-SNAPSHOT</version> + <version>2.2.0</version> <scope>compile</scope> </dependency>
          Hide
          Dragos Manolescu added a comment -

          Let's try this again, I have another patch ready.

          Show
          Dragos Manolescu added a comment - Let's try this again, I have another patch ready.
          Hide
          Swapnil Ghike added a comment -

          Hi Dragos, I am still not able to cleanly apply the patch. Is it created off 0.8 HEAD? Could you rebase in case it helps? Thanks.

          sghike@machine:~/kafka-local/kafka$ patch -p1 --dry-run < ../kafka-fix-for-826.patch
          patching file core/src/main/scala/kafka/cluster/Partition.scala
          patching file core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
          Hunk #1 succeeded at 650 (offset -3 lines).
          patching file core/src/main/scala/kafka/controller/KafkaController.scala
          Hunk #1 succeeded at 97 (offset 2 lines).
          patching file core/src/main/scala/kafka/log/Log.scala
          Hunk #1 succeeded at 130 with fuzz 2 (offset 53 lines).
          patching file core/src/main/scala/kafka/network/RequestChannel.scala
          patching file core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
          patching file core/src/main/scala/kafka/server/AbstractFetcherThread.scala
          Hunk #1 succeeded at 195 (offset -8 lines).
          patching file core/src/main/scala/kafka/server/ReplicaManager.scala
          Hunk #1 FAILED at 56.
          1 out of 1 hunk FAILED – saving rejects to file core/src/main/scala/kafka/server/ReplicaManager.scala.rej
          patching file core/src/main/scala/kafka/server/RequestPurgatory.scala
          patching file core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
          patching file project/Build.scala
          Hunk #2 FAILED at 34.
          1 out of 2 hunks FAILED – saving rejects to file project/Build.scala.rej
          patching file project/build/KafkaProject.scala
          Hunk #1 FAILED at 74.
          Hunk #2 FAILED at 82.
          2 out of 2 hunks FAILED – saving rejects to file project/build/KafkaProject.scala.rej

          Show
          Swapnil Ghike added a comment - Hi Dragos, I am still not able to cleanly apply the patch. Is it created off 0.8 HEAD? Could you rebase in case it helps? Thanks. sghike@machine:~/kafka-local/kafka$ patch -p1 --dry-run < ../kafka-fix-for-826.patch patching file core/src/main/scala/kafka/cluster/Partition.scala patching file core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Hunk #1 succeeded at 650 (offset -3 lines). patching file core/src/main/scala/kafka/controller/KafkaController.scala Hunk #1 succeeded at 97 (offset 2 lines). patching file core/src/main/scala/kafka/log/Log.scala Hunk #1 succeeded at 130 with fuzz 2 (offset 53 lines). patching file core/src/main/scala/kafka/network/RequestChannel.scala patching file core/src/main/scala/kafka/producer/async/ProducerSendThread.scala patching file core/src/main/scala/kafka/server/AbstractFetcherThread.scala Hunk #1 succeeded at 195 (offset -8 lines). patching file core/src/main/scala/kafka/server/ReplicaManager.scala Hunk #1 FAILED at 56. 1 out of 1 hunk FAILED – saving rejects to file core/src/main/scala/kafka/server/ReplicaManager.scala.rej patching file core/src/main/scala/kafka/server/RequestPurgatory.scala patching file core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala patching file project/Build.scala Hunk #2 FAILED at 34. 1 out of 2 hunks FAILED – saving rejects to file project/Build.scala.rej patching file project/build/KafkaProject.scala Hunk #1 FAILED at 74. Hunk #2 FAILED at 82. 2 out of 2 hunks FAILED – saving rejects to file project/build/KafkaProject.scala.rej
          Hide
          Dragos Manolescu added a comment -

          Alternatively this code is checked into my fork of the project, https://github.com/polymorphic/kafka, the metrics2 branch

          Show
          Dragos Manolescu added a comment - Alternatively this code is checked into my fork of the project, https://github.com/polymorphic/kafka , the metrics2 branch
          Hide
          Swapnil Ghike added a comment -

          Dragos Manolescu: Could you upload a patch using the attach files options above? I tried applying your patch using "patch -p1", but it could not be applied. Thanks.

          Show
          Swapnil Ghike added a comment - Dragos Manolescu : Could you upload a patch using the attach files options above? I tried applying your patch using "patch -p1", but it could not be applied. Thanks.
          Hide
          Scott Carey added a comment -

          The only real trouble with 3.0.x is if:

          • it conflicts with 2.2.x if in the same classloader
          • there is no released artifact in a public maven repo to provide repeatable builds.

          We might be able to find out when a 3.0-alpha may be available and pushed to a public maven repo. It does not appear that a final version is due out in time.

          Show
          Scott Carey added a comment - The only real trouble with 3.0.x is if: it conflicts with 2.2.x if in the same classloader there is no released artifact in a public maven repo to provide repeatable builds. We might be able to find out when a 3.0-alpha may be available and pushed to a public maven repo. It does not appear that a final version is due out in time.
          Hide
          Joel Koshy added a comment -

          Thank you for looking into this. Metrics 2.x had a few minor issues with the CsvReporter (which we use in the system tests) and this is why we
          used 3.x.

          The fixes that I'm aware of are:

          Unfortunately, although the above are small fixes, if we want to use the official 2.x metrics release we would need to copy over
          the code of the metrics CsvReporter (i.e., into a new implementation of metrics' AbstractReporter), patch in those fixes and plug
          that into KafkaMetricsCsvReporter. I don't think it is difficult, but a bit clunky (which is why at the time we preferred using 3.x).

          Show
          Joel Koshy added a comment - Thank you for looking into this. Metrics 2.x had a few minor issues with the CsvReporter (which we use in the system tests) and this is why we used 3.x. The fixes that I'm aware of are: https://github.com/codahale/metrics/pull/225 https://github.com/codahale/metrics/pull/290 If a CSV file already exists, metrics throws an IOException and does not resume CSV reporting. This would be the case on a broker bounce for example. Someone put out a patch for this ( https://github.com/adagios/metrics/compare/2.x-maintenance...2.x-epoch-in-csv ) but I'd have to check if that was pulled into metrics-3.x Unfortunately, although the above are small fixes, if we want to use the official 2.x metrics release we would need to copy over the code of the metrics CsvReporter (i.e., into a new implementation of metrics' AbstractReporter), patch in those fixes and plug that into KafkaMetricsCsvReporter. I don't think it is difficult, but a bit clunky (which is why at the time we preferred using 3.x).
          Hide
          Dragos Manolescu added a comment -

          diff --git a/core/lib/metrics-annotation-3.0.0-c0c8be71.jar b/core/lib/metrics-annotation-3.0.0-c0c8be71.jar
          deleted file mode 100644
          index dba9d2b..0000000
          Binary files a/core/lib/metrics-annotation-3.0.0-c0c8be71.jar and /dev/null differ
          diff --git a/core/lib/metrics-core-3.0.0-c0c8be71.jar b/core/lib/metrics-core-3.0.0-c0c8be71.jar
          deleted file mode 100644
          index 529a69b..0000000
          Binary files a/core/lib/metrics-core-3.0.0-c0c8be71.jar and /dev/null differ
          diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
          index 367ccd5..7788b30 100644
          — a/core/src/main/scala/kafka/cluster/Partition.scala
          +++ b/core/src/main/scala/kafka/cluster/Partition.scala
          @@ -60,7 +60,7 @@ class Partition(val topic: String,
          newGauge(
          topic + "-" + partitionId + "-UnderReplicated",
          new Gauge[Int] {

          • def getValue =
            Unknown macro: {+ def value = { if (isUnderReplicated) 1 else 0 } }

            diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
            index 972d33d..51b9c35 100644

              • a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
                +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
                @@ -653,7 +653,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                newGauge(
                config.clientId + "" + config.groupId + "" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
                new Gauge[Int] { - def getValue = q.size + def value = q.size }

                )
                })
                diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
                index 48eae7e..9c4c8d1 100644

              • a/core/src/main/scala/kafka/controller/KafkaController.scala
                +++ b/core/src/main/scala/kafka/controller/KafkaController.scala
                @@ -95,7 +95,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
                newGauge(
                "ActiveControllerCount",
                new Gauge[Int] { - def getValue() = if (isActive) 1 else 0 + def value() = if (isActive) 1 else 0 }

                )

          diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
          index 631953f..b7b266e 100644
          — a/core/src/main/scala/kafka/log/Log.scala
          +++ b/core/src/main/scala/kafka/log/Log.scala
          @@ -77,10 +77,10 @@ class Log(val dir: File,
          debug("Completed load of log %s with log end offset %d".format(name, logEndOffset))

          newGauge(name + "-" + "NumLogSegments",

          • new Gauge[Int] { def getValue = numberOfSegments }

            )
            + new Gauge[Int]

            { def value = numberOfSegments }

            )

          newGauge(name + "-" + "LogEndOffset",

          • new Gauge[Long] { def getValue = logEndOffset }

            )
            + new Gauge[Long]

            { def value = logEndOffset }

            )

          /** The name of this log */
          def name = dir.getName()
          diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
          index 209fdfa..c0e0dfc 100644
          — a/core/src/main/scala/kafka/network/RequestChannel.scala
          +++ b/core/src/main/scala/kafka/network/RequestChannel.scala
          @@ -99,7 +99,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
          newGauge(
          "RequestQueueSize",
          new Gauge[Int]

          { - def getValue = requestQueue.size + def value = requestQueue.size }

          )

          diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
          index 6691147..090400d 100644
          — a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
          +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
          @@ -36,7 +36,7 @@ class ProducerSendThread[K,V](val threadName: String,

          newGauge(clientId + "-ProducerQueueSize",
          new Gauge[Int]

          { - def getValue = queue.size + def value = queue.size }

          )

          override def run {
          diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
          index a7d39b1..006d573 100644
          — a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
          +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
          @@ -203,7 +203,7 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet
          newGauge(
          metricId + "-ConsumerLag",
          new Gauge[Long]

          { - def getValue = lagVal.get + def value = lagVal.get }

          )

          diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
          index 765d3cb..e1d5bd8 100644
          — a/core/src/main/scala/kafka/server/ReplicaManager.scala
          +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
          @@ -56,19 +56,19 @@ class ReplicaManager(val config: KafkaConfig,
          newGauge(
          "LeaderCount",
          new Gauge[Int]

          { - def getValue = leaderPartitions.size + def value = leaderPartitions.size }

          )
          newGauge(
          "PartitionCount",
          new Gauge[Int]

          { - def getValue = allPartitions.size + def value = allPartitions.size }

          )
          newGauge(
          "UnderReplicatedPartitions",
          new Gauge[Int] {

          • def getValue = {
            + def value = {
            leaderPartitionsLock synchronized { leaderPartitions.count(_.isUnderReplicated) }

            diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
            index afe9e22..c064c5c 100644

              • a/core/src/main/scala/kafka/server/RequestPurgatory.scala
                +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
                @@ -72,14 +72,14 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
                newGauge(
                "PurgatorySize",
                new Gauge[Int] { - def getValue = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests + def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests }

                )

          newGauge(
          "NumDelayedRequests",
          new Gauge[Int]

          { - def getValue = expiredRequestReaper.unsatisfied.get() + def value = expiredRequestReaper.unsatisfied.get() }

          )

          diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
          index a3f85cf..fe5bc09 100644
          — a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
          +++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
          @@ -35,20 +35,20 @@ class KafkaTimerTest extends JUnit3Suite {
          timer.time

          { clock.addMillis(1000) }
          • assertEquals(1, metric.getCount())
          • assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon)
          • assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon)
            + assertEquals(1, metric.count())
            + assertTrue((metric.max() - 1000).abs <= Double.Epsilon)
            + assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
            }

          private class ManualClock extends Clock {

          private var ticksInNanos = 0L

          • override def getTick() = {
            + override def tick() = { ticksInNanos }
          • override def getTime() = {
            + override def time() = { TimeUnit.NANOSECONDS.toMillis(ticksInNanos) }

          diff --git a/project/Build.scala b/project/Build.scala
          index facca79..bc3bc0c 100644
          — a/project/Build.scala
          +++ b/project/Build.scala
          @@ -17,7 +17,6 @@

          import sbt._
          import Keys._
          -import java.io.File

          import scala.xml.

          {Node, Elem}

          import scala.xml.transform.

          {RewriteRule, RuleTransformer}

          @@ -35,7 +34,9 @@ object KafkaBuild extends Build {
          "log4j" % "log4j" % "1.2.15",
          "net.sf.jopt-simple" % "jopt-simple" % "3.2",
          "org.slf4j" % "slf4j-simple" % "1.6.4",

          • "com.101tec" % "zkclient" % "0.2"
            + "com.101tec" % "zkclient" % "0.2",
            + "com.yammer.metrics" % "metrics-core" % "2.2.0",
            + "com.yammer.metrics" % "metrics-annotation" % "2.2.0"
            ),
            // The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
            // some dependencies on various sun and javax packages.
            diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala
            index 1660fb8..853a45c 100644
              • a/project/build/KafkaProject.scala
                +++ b/project/build/KafkaProject.scala
                @@ -74,7 +74,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
                <dependency>
                <groupId>com.yammer.metrics</groupId>
                <artifactId>metrics-core</artifactId>
          • <version>3.0.0-c0c8be71</version>
            + <version>2.2.0</version>
            <scope>compile</scope>
            </dependency>

          @@ -82,7 +82,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
          <dependency>
          <groupId>com.yammer.metrics</groupId>
          <artifactId>metrics-annotation</artifactId>

          • <version>3.0.0-c0c8be71</version>
            + <version>2.2.0</version>
            <scope>compile</scope>
            </dependency>
          Show
          Dragos Manolescu added a comment - diff --git a/core/lib/metrics-annotation-3.0.0-c0c8be71.jar b/core/lib/metrics-annotation-3.0.0-c0c8be71.jar deleted file mode 100644 index dba9d2b..0000000 Binary files a/core/lib/metrics-annotation-3.0.0-c0c8be71.jar and /dev/null differ diff --git a/core/lib/metrics-core-3.0.0-c0c8be71.jar b/core/lib/metrics-core-3.0.0-c0c8be71.jar deleted file mode 100644 index 529a69b..0000000 Binary files a/core/lib/metrics-core-3.0.0-c0c8be71.jar and /dev/null differ diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 367ccd5..7788b30 100644 — a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -60,7 +60,7 @@ class Partition(val topic: String, newGauge( topic + "-" + partitionId + "-UnderReplicated", new Gauge [Int] { def getValue = Unknown macro: {+ def value = { if (isUnderReplicated) 1 else 0 } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 972d33d..51b9c35 100644 a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -653,7 +653,7 @@ private [kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, newGauge( config.clientId + " " + config.groupId + " " + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", new Gauge [Int] { - def getValue = q.size + def value = q.size } ) }) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 48eae7e..9c4c8d1 100644 a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -95,7 +95,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg newGauge( "ActiveControllerCount", new Gauge [Int] { - def getValue() = if (isActive) 1 else 0 + def value() = if (isActive) 1 else 0 } ) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 631953f..b7b266e 100644 — a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -77,10 +77,10 @@ class Log(val dir: File, debug("Completed load of log %s with log end offset %d".format(name, logEndOffset)) newGauge(name + "-" + "NumLogSegments", new Gauge [Int] { def getValue = numberOfSegments } ) + new Gauge [Int] { def value = numberOfSegments } ) newGauge(name + "-" + "LogEndOffset", new Gauge [Long] { def getValue = logEndOffset } ) + new Gauge [Long] { def value = logEndOffset } ) /** The name of this log */ def name = dir.getName() diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 209fdfa..c0e0dfc 100644 — a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -99,7 +99,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe newGauge( "RequestQueueSize", new Gauge [Int] { - def getValue = requestQueue.size + def value = requestQueue.size } ) diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 6691147..090400d 100644 — a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -36,7 +36,7 @@ class ProducerSendThread [K,V] (val threadName: String, newGauge(clientId + "-ProducerQueueSize", new Gauge [Int] { - def getValue = queue.size + def value = queue.size } ) override def run { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index a7d39b1..006d573 100644 — a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -203,7 +203,7 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet newGauge( metricId + "-ConsumerLag", new Gauge [Long] { - def getValue = lagVal.get + def value = lagVal.get } ) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 765d3cb..e1d5bd8 100644 — a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -56,19 +56,19 @@ class ReplicaManager(val config: KafkaConfig, newGauge( "LeaderCount", new Gauge [Int] { - def getValue = leaderPartitions.size + def value = leaderPartitions.size } ) newGauge( "PartitionCount", new Gauge [Int] { - def getValue = allPartitions.size + def value = allPartitions.size } ) newGauge( "UnderReplicatedPartitions", new Gauge [Int] { def getValue = { + def value = { leaderPartitionsLock synchronized { leaderPartitions.count(_.isUnderReplicated) } diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index afe9e22..c064c5c 100644 a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -72,14 +72,14 @@ abstract class RequestPurgatory [T <: DelayedRequest, R] (brokerId: Int = 0, purge newGauge( "PurgatorySize", new Gauge [Int] { - def getValue = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests + def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests } ) newGauge( "NumDelayedRequests", new Gauge [Int] { - def getValue = expiredRequestReaper.unsatisfied.get() + def value = expiredRequestReaper.unsatisfied.get() } ) diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala index a3f85cf..fe5bc09 100644 — a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala @@ -35,20 +35,20 @@ class KafkaTimerTest extends JUnit3Suite { timer.time { clock.addMillis(1000) } assertEquals(1, metric.getCount()) assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon) assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon) + assertEquals(1, metric.count()) + assertTrue((metric.max() - 1000).abs <= Double.Epsilon) + assertTrue((metric.min() - 1000).abs <= Double.Epsilon) } private class ManualClock extends Clock { private var ticksInNanos = 0L override def getTick() = { + override def tick() = { ticksInNanos } override def getTime() = { + override def time() = { TimeUnit.NANOSECONDS.toMillis(ticksInNanos) } diff --git a/project/Build.scala b/project/Build.scala index facca79..bc3bc0c 100644 — a/project/Build.scala +++ b/project/Build.scala @@ -17,7 +17,6 @@ import sbt._ import Keys._ -import java.io.File import scala.xml. {Node, Elem} import scala.xml.transform. {RewriteRule, RuleTransformer} @@ -35,7 +34,9 @@ object KafkaBuild extends Build { "log4j" % "log4j" % "1.2.15", "net.sf.jopt-simple" % "jopt-simple" % "3.2", "org.slf4j" % "slf4j-simple" % "1.6.4", "com.101tec" % "zkclient" % "0.2" + "com.101tec" % "zkclient" % "0.2", + "com.yammer.metrics" % "metrics-core" % "2.2.0", + "com.yammer.metrics" % "metrics-annotation" % "2.2.0" ), // The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required // some dependencies on various sun and javax packages. diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala index 1660fb8..853a45c 100644 a/project/build/KafkaProject.scala +++ b/project/build/KafkaProject.scala @@ -74,7 +74,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>3.0.0-c0c8be71</version> + <version>2.2.0</version> <scope>compile</scope> </dependency> @@ -82,7 +82,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-annotation</artifactId> <version>3.0.0-c0c8be71</version> + <version>2.2.0</version> <scope>compile</scope> </dependency>
          Hide
          Neha Narkhede added a comment -

          We need to remove the dependency on the custom metrics jar in order to mavenize kafka

          Show
          Neha Narkhede added a comment - We need to remove the dependency on the custom metrics jar in order to mavenize kafka
          Hide
          Dragos Manolescu added a comment -

          Good to see this pop on your radar screen... I looked at what it would take to move from 3.0.0-c0c8be71 to SNAPSHOT and there were quite a few changes. I've just forked the project, if I make progress I'll send a pull request.

          Show
          Dragos Manolescu added a comment - Good to see this pop on your radar screen... I looked at what it would take to move from 3.0.0-c0c8be71 to SNAPSHOT and there were quite a few changes. I've just forked the project, if I make progress I'll send a pull request.
          Hide
          Scott Carey added a comment -

          Thank you! We will be able to test and validate this quickly once there is a patch.

          Metrics 3.0.x has hit its first snapshot recently:
          https://groups.google.com/forum/#!topic/metrics-user/c4sPUhLjHEQ

          However, it looks like it won't be done in time for Kafka 0.8. It now at least does not conflict with a copy of 2.2.x as badly as it did a couple months ago.

          Show
          Scott Carey added a comment - Thank you! We will be able to test and validate this quickly once there is a patch. Metrics 3.0.x has hit its first snapshot recently: https://groups.google.com/forum/#!topic/metrics-user/c4sPUhLjHEQ However, it looks like it won't be done in time for Kafka 0.8. It now at least does not conflict with a copy of 2.2.x as badly as it did a couple months ago.

            People

            • Assignee:
              Dragos Manolescu
              Reporter:
              Neha Narkhede
            • Votes:
              0 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development