diff --git a/config/server.properties b/config/server.properties
index 9ca0f8d..e92f599 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -117,6 +117,7 @@ zk.connectiontimeout.ms=1000000
 # metrics reporter properties
 kafka.metrics.polling.interval.secs=5
 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
-kafka.csv.metrics.dir=kafka_metrics
-kafka.csv.metrics.reporter.enabled=true
+kafka.csv.metrics.dir=/tmp/kafka_metrics
+# Disable csv reporting by default.
+kafka.csv.metrics.reporter.enabled=false
 
diff --git a/core/lib/metrics-annotation-3.0.0-c0c8be71.jar b/core/lib/metrics-annotation-3.0.0-c0c8be71.jar
new file mode 100644
index 0000000..dba9d2b
Binary files /dev/null and b/core/lib/metrics-annotation-3.0.0-c0c8be71.jar differ
diff --git a/core/lib/metrics-annotation-3.0.1.jar b/core/lib/metrics-annotation-3.0.1.jar
deleted file mode 100644
index 7609fbb..0000000
Binary files a/core/lib/metrics-annotation-3.0.1.jar and /dev/null differ
diff --git a/core/lib/metrics-core-3.0.0-c0c8be71.jar b/core/lib/metrics-core-3.0.0-c0c8be71.jar
new file mode 100644
index 0000000..529a69b
Binary files /dev/null and b/core/lib/metrics-core-3.0.0-c0c8be71.jar differ
diff --git a/core/lib/metrics-core-3.0.1.jar b/core/lib/metrics-core-3.0.1.jar
deleted file mode 100644
index 22d43d5..0000000
Binary files a/core/lib/metrics-core-3.0.1.jar and /dev/null differ
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 830995b..dafb1ee 100644
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -18,7 +18,7 @@
 package kafka
 
 
-import metrics.KafkaCSVMetricsReporter
+import metrics.KafkaMetricsReporter
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
 import utils.{Utils, Logging}
 
@@ -33,7 +33,7 @@ object Kafka extends Logging {
     try {
       val props = Utils.loadProps(args(0))
       val serverConfig = new KafkaConfig(props)
-      KafkaCSVMetricsReporter.startCSVMetricReporter(serverConfig.props)
+      KafkaMetricsReporter.startReporters(serverConfig.props)
       val kafkaServerStartble = new KafkaServerStartable(serverConfig)
 
       // attach shutdown handler to catch control-c
@@ -41,7 +41,7 @@ object Kafka extends Logging {
         override def run() = {
           kafkaServerStartble.shutdown
         }
-      });
+      })
 
       kafkaServerStartble.startup
       kafkaServerStartble.awaitShutdown
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 2d21a2d..79a5cdc 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -26,7 +26,7 @@ import java.io.PrintStream
 import kafka.message._
 import kafka.serializer.StringDecoder
 import kafka.utils._
-import kafka.metrics.KafkaCSVMetricsReporter
+import kafka.metrics.KafkaMetricsReporter
 
 
 /**
@@ -140,7 +140,7 @@ object ConsoleConsumer extends Logging {
         csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics")
       csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true")
       val verifiableProps = new VerifiableProperties(csvReporterProps)
-      KafkaCSVMetricsReporter.startCSVMetricReporter(verifiableProps)
+      KafkaMetricsReporter.startReporters(verifiableProps)
     }
 
     val props = new Properties()
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 8e6ce17..bea258a 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -117,7 +117,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       config.autoCommitIntervalMs, false)
   }
 
-  KafkaCSVMetricsReporter.startCSVMetricReporter(config.props)
+  KafkaMetricsReporter.startReporters(config.props)
 
   def this(config: ConsumerConfig) = this(config, true)
 
diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
index 70fb9a5..fda0b24 100644
--- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -30,26 +30,6 @@ import kafka.utils.{Utils, VerifiableProperties, Logging}
 
 private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean
 
-object KafkaCSVMetricsReporter {
-  val CSVReporterStarted: AtomicBoolean = new AtomicBoolean(false)
-
-  def startCSVMetricReporter (verifiableProps: VerifiableProperties) {
-    CSVReporterStarted synchronized {
-      if (CSVReporterStarted.get() == false) {
-        val metricsConfig = new KafkaMetricsConfig(verifiableProps)
-        if(metricsConfig.reporters.size > 0) {
-          metricsConfig.reporters.foreach(reporterType => {
-            val reporter = Utils.createObject[KafkaMetricsReporter](reporterType)
-            reporter.init(verifiableProps)
-            if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
-              Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
-          })
-          CSVReporterStarted.set(true)
-        }
-      }
-    }
-  }
-}
 
 private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
                               with KafkaCSVMetricsReporterMBean
@@ -69,8 +49,8 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
       if (!initialized) {
         val metricsConfig = new KafkaMetricsConfig(props)
         csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics"))
-        if (!csvDir.exists())
-          csvDir.mkdirs()
+        Utils.rm(csvDir)
+        csvDir.mkdirs()
         underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
         if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) {
           initialized = true
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
index 57f2789..14e4624 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -20,7 +20,9 @@
 
 package kafka.metrics
 
-import kafka.utils.VerifiableProperties
+import kafka.utils.{Utils, VerifiableProperties}
+import java.util.concurrent.atomic.AtomicBoolean
+
 
 /**
  * Base trait for reporter MBeans. If a client wants to expose these JMX
@@ -45,3 +47,24 @@ trait KafkaMetricsReporter {
   def init(props: VerifiableProperties)
 }
 
+object KafkaMetricsReporter {
+  val ReporterStarted: AtomicBoolean = new AtomicBoolean(false)
+
+  def startReporters (verifiableProps: VerifiableProperties) {
+    ReporterStarted synchronized {
+      if (ReporterStarted.get() == false) {
+        val metricsConfig = new KafkaMetricsConfig(verifiableProps)
+        if(metricsConfig.reporters.size > 0) {
+          metricsConfig.reporters.foreach(reporterType => {
+            val reporter = Utils.createObject[KafkaMetricsReporter](reporterType)
+            reporter.init(verifiableProps)
+            if (reporter.isInstanceOf[KafkaMetricsReporterMBean])
+              Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName)
+          })
+          ReporterStarted.set(true)
+        }
+      }
+    }
+  }
+}
+
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 172e108..fbbf07d 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -49,7 +49,7 @@ extends Logging {
     case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
   }
 
-  KafkaCSVMetricsReporter.startCSVMetricReporter(config.props)
+  KafkaMetricsReporter.startReporters(config.props)
 
   def this(config: ProducerConfig) =
     this(config,
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index c343d84..9c4e9a0 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -26,7 +26,7 @@ import java.text.SimpleDateFormat
 import java.util._
 import collection.immutable.List
 import kafka.utils.{VerifiableProperties, Logging}
-import kafka.metrics.KafkaCSVMetricsReporter
+import kafka.metrics.KafkaMetricsReporter
 
 
 /**
@@ -175,7 +175,7 @@ object ProducerPerformance extends Logging {
         props.put("kafka.csv.metrics.dir", "kafka_metrics")
       props.put("kafka.csv.metrics.reporter.enabled", "true")
       val verifiableProps = new VerifiableProperties(props)
-      KafkaCSVMetricsReporter.startCSVMetricReporter(verifiableProps)
+      KafkaMetricsReporter.startReporters(verifiableProps)
     }
 
     val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala
index 2e8526f..2dd010e 100644
--- a/project/build/KafkaProject.scala
+++ b/project/build/KafkaProject.scala
@@ -71,13 +71,13 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
         <dependency>
           <groupId>com.yammer.metrics</groupId>
           <artifactId>metrics-core</artifactId>
-          <version>3.0.1</version>
+          <version>3.0.0-c0c8be71</version>
           <scope>compile</scope>
         </dependency>
         <dependency>
           <groupId>com.yammer.metrics</groupId>
           <artifactId>metrics-annotations</artifactId>
-          <version>3.0.1</version>
+          <version>3.0.0-c0c8be71</version>
           <scope>compile</scope>
         </dependency>
       </dependencies>
