diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
index 3c6420c..eee6f8e 100644
--- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -60,6 +60,7 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
     if (initialized.get()) synchronized {
       if (running == false) {
         underlying.start(pollingPeriodSecs, TimeUnit.SECONDS)
+        running = true
         info("Started Kafka CSV metrics reporter with polling period %d seconds".format(pollingPeriodSecs))
       }
     }
@@ -70,6 +71,7 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
     if (initialized.get()) synchronized {
       if (running == true) {
         underlying.shutdown()
+        running = false
         info("Stopped Kafka CSV metrics reporter")
         underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
       }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1188d4b..b4b39f8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -765,15 +765,15 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
 
 
     def recordDelayedProducerKeyExpired(key: MetricKey) {
-      val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(
-        key, () => new DelayedProducerRequestMetrics(key.keyLabel))
+      lazy val factory = new DelayedProducerRequestMetrics(key.keyLabel)
+      val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key, factory)
       List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
     }
 
 
     def recordDelayedProducerKeyCaughtUp(key: MetricKey, timeToCatchUpNs: Long, bytes: Int) {
-      val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(
-        key, () => new DelayedProducerRequestMetrics(key.keyLabel))
+      lazy val factory = new DelayedProducerRequestMetrics(key.keyLabel)
+      val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key, factory)
       List(keyMetrics, aggregateProduceRequestMetrics).foreach(m => {
         m.caughtUpFollowerFetchRequestMeter.mark()
         m.followerCatchUpTimeHistogram.foreach(_.update(timeToCatchUpNs))
@@ -797,11 +797,11 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
         val topic = topicAndData._1
         topicAndData._2.partitionDataArray.foreach(partitionData => {
           val key = RequestKey(topic, partitionData.partition)
-          val makeNewMetrics = () => new DelayedFetchRequestMetrics(forFollower, key.keyLabel)
+          lazy val factory = new DelayedFetchRequestMetrics(forFollower, key.keyLabel)
           val keyMetrics = if (forFollower)
-            followerFetchRequestMetricsForKey.getAndMaybePut(key, makeNewMetrics)
+            followerFetchRequestMetricsForKey.getAndMaybePut(key, factory)
           else
-            nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key, makeNewMetrics)
+            nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key, factory)
           keyMetrics.throughputMeter.mark(partitionData.sizeInBytes)
         })
       })
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d261a51..a8c8fac 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -36,7 +36,7 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {
   val hostName: String = Utils.getString(props, "hostname", InetAddress.getLocalHost.getHostAddress)
 
   /* the broker id for this server */
-  val brokerId: Int = Utils.getInt(props, "brokerid", 0)
+  val brokerId: Int = Utils.getInt(props, "brokerid")
   
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 49efce5..933f57f 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -121,8 +121,9 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
     else
       w.collectSatisfiedRequests(request)
   }
-  
-  private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key, () => new Watchers)
+
+  private lazy val factory = new Watchers
+  private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key, factory)
   
   /**
    * Check if this request satisfied this delayed request
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index ff53eee..b8232c0 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -34,10 +34,22 @@ class Pool[K,V] extends Iterable[(K, V)] {
   
   def putIfNotExists(k: K, v: V) = pool.putIfAbsent(k, v)
 
-  def getAndMaybePut(key: K, createValueIfAbsent:() => V) = {
+  /**
+   * Gets the value associated with the given key. If there is no associated
+   * value, then create the value using the factory method and return the value
+   * associated with the key. The user should declare the factory method as lazy
+   * if its side-effects need to be avoided.
+   *
+   * @param key The key to lookup.
+   * @param createValueIfAbsent Factory method to create the value if absent.
+   * @return The final value associated with the key. This may be different from
+   *         the value created by the factory if another thread successfully
+   *         put a value.
+   */
+  def getAndMaybePut(key: K, createValueIfAbsent: => V) = {
     val curr = pool.get(key)
     if (curr == null) {
-      pool.putIfAbsent(key, createValueIfAbsent())
+      pool.putIfAbsent(key, createValueIfAbsent)
       pool.get(key)
     }
     else
diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
index 351e140..019e515 100644
--- a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
@@ -41,7 +41,7 @@ class KafkaTimerTest extends JUnit3Suite {
     assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
   }
 
-  class ManualClock extends Clock {
+  private class ManualClock extends Clock {
 
     private var ticksInNanos = 0L
 
@@ -56,9 +56,5 @@ class KafkaTimerTest extends JUnit3Suite {
     def addMillis(millis: Long) {
       ticksInNanos += TimeUnit.MILLISECONDS.toNanos(millis)
     }
-
-    def addHours(hours: Long) {
-      ticksInNanos += TimeUnit.HOURS.toNanos(hours)
-    }
   }
 }
diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala
index 0389c59..230c28f 100644
--- a/project/build/KafkaProject.scala
+++ b/project/build/KafkaProject.scala
@@ -251,8 +251,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
   trait CoreDependencies {
     val log4j = "log4j" % "log4j" % "1.2.15"
     val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
-    private val metricsVersion = "latest.release"
-    val metricsCore = "com.yammer.metrics" % "metrics-core" % metricsVersion
+    val metricsCore = "com.yammer.metrics" % "metrics-core" % "latest.release"
     val slf4jSimple = "org.slf4j" % "slf4j-simple" % "latest.release"
   }
   
