diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 2ca7ee6..e49bdae 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -60,7 +60,7 @@ class Partition(val topic: String,
newGauge(
topic + "-" + partitionId + "-UnderReplicated",
new Gauge[Int] {
- def getValue = {
+ def value = {
if (isUnderReplicated) 1 else 0
}
}
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 9a5fbfe..398618f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -650,7 +650,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
newGauge(
config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
new Gauge[Int] {
- def getValue = q.size
+ def value = q.size
}
)
})
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 74614d8..5f6eb3c 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -97,14 +97,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
newGauge(
"ActiveControllerCount",
new Gauge[Int] {
- def getValue() = if (isActive) 1 else 0
+ def value() = if (isActive) 1 else 0
}
)
newGauge(
"OfflinePartitionsCount",
new Gauge[Int] {
- def getValue: Int = {
+ def value(): Int = {
controllerContext.controllerLock synchronized {
controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader))
}
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 7d71451..451775b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -130,10 +130,10 @@ private[kafka] class Log(val dir: File,
debug("Completed load of log %s with log end offset %d".format(name, logEndOffset))
newGauge(name + "-" + "NumLogSegments",
- new Gauge[Int] { def getValue = numberOfSegments })
+ new Gauge[Int] { def value = numberOfSegments })
newGauge(name + "-" + "LogEndOffset",
- new Gauge[Long] { def getValue = logEndOffset })
+ new Gauge[Long] { def value = logEndOffset })
/* The name of this log */
def name = dir.getName()
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 209fdfa..c0e0dfc 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -99,7 +99,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
newGauge(
"RequestQueueSize",
new Gauge[Int] {
- def getValue = requestQueue.size
+ def value = requestQueue.size
}
)
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 6691147..090400d 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -36,7 +36,7 @@ class ProducerSendThread[K,V](val threadName: String,
newGauge(clientId + "-ProducerQueueSize",
new Gauge[Int] {
- def getValue = queue.size
+ def value = queue.size
})
override def run {
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 087979f..2e026e6 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -195,7 +195,7 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet
newGauge(
metricId + "-ConsumerLag",
new Gauge[Long] {
- def getValue = lagVal.get
+ def value = lagVal.get
}
)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 68e712c..44ad562 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -57,7 +57,7 @@ class ReplicaManager(val config: KafkaConfig,
newGauge(
"LeaderCount",
new Gauge[Int] {
- def getValue = {
+ def value = {
leaderPartitionsLock synchronized {
leaderPartitions.size
}
@@ -67,13 +67,13 @@ class ReplicaManager(val config: KafkaConfig,
newGauge(
"PartitionCount",
new Gauge[Int] {
- def getValue = allPartitions.size
+ def value = allPartitions.size
}
)
newGauge(
"UnderReplicatedPartitions",
new Gauge[Int] {
- def getValue = {
+ def value = {
leaderPartitionsLock synchronized {
leaderPartitions.count(_.isUnderReplicated)
}
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index afe9e22..c064c5c 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -72,14 +72,14 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
newGauge(
"PurgatorySize",
new Gauge[Int] {
- def getValue = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests
+ def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests
}
)
newGauge(
"NumDelayedRequests",
new Gauge[Int] {
- def getValue = expiredRequestReaper.unsatisfied.get()
+ def value = expiredRequestReaper.unsatisfied.get()
}
)
diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
index a3f85cf..fe5bc09 100644
--- a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
@@ -35,20 +35,20 @@ class KafkaTimerTest extends JUnit3Suite {
timer.time {
clock.addMillis(1000)
}
- assertEquals(1, metric.getCount())
- assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon)
- assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon)
+ assertEquals(1, metric.count())
+ assertTrue((metric.max() - 1000).abs <= Double.Epsilon)
+ assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
}
private class ManualClock extends Clock {
private var ticksInNanos = 0L
- override def getTick() = {
+ override def tick() = {
ticksInNanos
}
- override def getTime() = {
+ override def time() = {
TimeUnit.NANOSECONDS.toMillis(ticksInNanos)
}
diff --git a/project/Build.scala b/project/Build.scala
index 4bbdfee..b8b476b 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -17,7 +17,6 @@
import sbt._
import Keys._
-import java.io.File
import scala.xml.{Node, Elem}
import scala.xml.transform.{RewriteRule, RuleTransformer}
@@ -34,7 +33,10 @@ object KafkaBuild extends Build {
libraryDependencies ++= Seq(
"log4j" % "log4j" % "1.2.15",
"net.sf.jopt-simple" % "jopt-simple" % "3.2",
- "org.slf4j" % "slf4j-simple" % "1.6.4"
+ "org.slf4j" % "slf4j-simple" % "1.6.4",
+ "com.101tec" % "zkclient" % "0.2",
+ "com.yammer.metrics" % "metrics-core" % "2.2.0",
+ "com.yammer.metrics" % "metrics-annotation" % "2.2.0"
),
// The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
// some dependencies on various sun and javax packages.
diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala
index fac723a..853a45c 100644
--- a/project/build/KafkaProject.scala
+++ b/project/build/KafkaProject.scala
@@ -74,7 +74,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
com.yammer.metrics
metrics-core
- 3.0.0-SNAPSHOT
+ 2.2.0
compile
@@ -82,7 +82,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
com.yammer.metrics
metrics-annotation
- 3.0.0-SNAPSHOT
+ 2.2.0
compile