diff --git bin/kafka-run-class.sh bin/kafka-run-class.sh
index e93f670..427a0dc 100755
--- bin/kafka-run-class.sh
+++ bin/kafka-run-class.sh
@@ -51,8 +51,9 @@ done
 if [ -z "$KAFKA_JMX_OPTS" ]; then
   KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "
 fi
+LOG4JCONFIG=${LOG4JCONFIG:-$base_dir/config/log4j.properties}
 if [ -z "$KAFKA_OPTS" ]; then
-  KAFKA_OPTS="-Xmx512M -server  -Dlog4j.configuration=file:$base_dir/config/log4j.properties"
+  KAFKA_OPTS="-Xmx512M -server  -Dlog4j.configuration=file:$LOG4JCONFIG"
 fi
 if [  $JMX_PORT ]; then
   KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
diff --git config/log4j.properties config/log4j.properties
index afe14af..2847664 100644
--- config/log4j.properties
+++ config/log4j.properties
@@ -18,13 +18,13 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
 
-#log4j.appender.fileAppender=org.apache.log4j.FileAppender
-#log4j.appender.fileAppender.File=kafka-request.log
-#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
-#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
-
+log4j.appender.fileAppender=org.apache.log4j.FileAppender
+log4j.appender.fileAppender.File=kafka-apis-broker-2.log
+log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.fileAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 
 # Turn on all our debugging info
 #log4j.logger.kafka=INFO
 #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+#log4j.logger.kafka.server.KafkaApis=TRACE, fileAppender
 
diff --git config/server0.log4j config/server0.log4j
new file mode 100644
index 0000000..b98a822
--- /dev/null
+++ config/server0.log4j
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, fileAppender
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.fileAppender=org.apache.log4j.FileAppender
+log4j.appender.fileAppender.File=kafka-broker-0.log
+log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.fileAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# Turn on all our debugging info
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.kafka.server=TRACE
+log4j.additivity.kafka=false
+
diff --git config/server0.properties config/server0.properties
new file mode 100644
index 0000000..aa2fbc3
--- /dev/null
+++ config/server0.properties
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+brokerid=0
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
+# may not be what you want.
+hostname=localhost
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9092
+
+# The number of threads handling network requests
+network.threads=2
+ 
+# The number of threads doing disk I/O
+io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+max.socket.request.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka-1/kafka-logs
+
+# The number of logical partitions per topic per server. More partitions allow greater parallelism
+# for consumption, but also mean more files.
+num.partitions=1
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+#    3. Throughput: The flush is generally the most expensive operation. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.default.flush.interval.ms=1000
+
+# Per-topic overrides for log.default.flush.interval.ms
+#topic.flush.intervals.ms=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.default.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.size.
+#log.retention.size=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.file.size=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zk.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+default.replication.factor=2
+
+metrics.graphite.enabled.at.startup=true
+metrics.poll.interval.secs=2
diff --git config/server1.log4j config/server1.log4j
new file mode 100644
index 0000000..fccb3cb
--- /dev/null
+++ config/server1.log4j
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, fileAppender
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.fileAppender=org.apache.log4j.FileAppender
+log4j.appender.fileAppender.File=kafka-broker-1.log
+log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.fileAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# Turn on all our debugging info
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.additivity.kafka=false
+log4j.logger.kafka.server=TRACE
+
diff --git config/server1.properties config/server1.properties
new file mode 100644
index 0000000..e51d376
--- /dev/null
+++ config/server1.properties
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+brokerid=1
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
+# may not be what you want.
+hostname=localhost
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9093
+
+# The number of threads handling network requests
+network.threads=2
+ 
+# The number of threads doing disk I/O
+io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+max.socket.request.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka-2/kafka-logs
+
+# The number of logical partitions per topic per server. More partitions allow greater parallelism
+# for consumption, but also mean more files.
+num.partitions=1
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+#    3. Throughput: The flush is generally the most expensive operation. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.default.flush.interval.ms=1000
+
+# Per-topic overrides for log.default.flush.interval.ms
+#topic.flush.intervals.ms=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.default.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.size.
+#log.retention.size=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.file.size=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zk.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+default.replication.factor=2
+
+metrics.graphite.enabled.at.startup=true
+metrics.poll.interval.secs=2
diff --git core/src/main/scala/kafka/Kafka.scala core/src/main/scala/kafka/Kafka.scala
index 8e2537d..1bc8d69 100644
--- core/src/main/scala/kafka/Kafka.scala
+++ core/src/main/scala/kafka/Kafka.scala
@@ -17,6 +17,8 @@
 
 package kafka
 
+
+import metrics.KafkaMetrics
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
 import utils.{Utils, Logging}
 import org.apache.log4j.jmx.LoggerDynamicMBean
@@ -36,6 +38,8 @@ object Kafka extends Logging {
     try {
       val props = Utils.loadProps(args(0))
       val serverConfig = new KafkaConfig(props)
+      val kafkaMetrics = new KafkaMetrics(props)
+      Utils.registerMBean(kafkaMetrics, KafkaMetrics.kafkaMetricsMBeanName)
 
       val kafkaServerStartble = new KafkaServerStartable(serverConfig)
 
diff --git core/src/main/scala/kafka/api/FetchResponse.scala core/src/main/scala/kafka/api/FetchResponse.scala
index c475ed7..1d581d4 100644
--- core/src/main/scala/kafka/api/FetchResponse.scala
+++ core/src/main/scala/kafka/api/FetchResponse.scala
@@ -73,7 +73,6 @@ class PartitionDataSend(val partitionData: PartitionData) extends Send {
   }
 }
 
-
 object TopicData {
   def readFrom(buffer: ByteBuffer): TopicData = {
     val topic = Utils.readShortString(buffer, "UTF-8")
diff --git core/src/main/scala/kafka/metrics/KafkaMetrics.scala core/src/main/scala/kafka/metrics/KafkaMetrics.scala
new file mode 100644
index 0000000..9585392
--- /dev/null
+++ core/src/main/scala/kafka/metrics/KafkaMetrics.scala
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.metrics
+
+
+import com.yammer.metrics.reporting.{GangliaReporter, GraphiteReporter, CsvReporter}
+import java.util.concurrent.TimeUnit
+import com.yammer.metrics.Metrics
+import java.io.File
+import java.util.Properties
+import kafka.utils.Logging
+
+
+object KafkaMetrics {
+  val kafkaMetricsMBeanName = "kafka:type=kafka.KafkaMetrics"
+}
+
+class KafkaMetrics(props: Properties) extends KafkaMetricsMBean
+                                      with KafkaMetricsConfigShared
+                                      with Logging {
+  val csvDir = new File(csvReporterDir)
+  val csvReporter = new CsvReporter(Metrics.defaultRegistry(), csvDir)
+  val graphiteReporter = new GraphiteReporter(graphiteCarbonHost, graphiteCarbonPort, null)
+  val gangliaReporter = new GangliaReporter(gangliaHost, gangliaPort)
+
+  if (csvReporterEnabledAtStartUp) startCsvReporter(metricsPollInterval)
+  if (graphiteReporterEnabledAtStartUp) startGraphiteReporter(metricsPollInterval)
+  if (gangliaReporterEnabledAtStartUp) startGangliaReporter(metricsPollInterval)
+
+  override def kafkaMetricsConfigProps = props
+
+  def startCsvReporter(pollingPeriodSecs: Long) {
+    if (!csvDir.exists())
+      csvDir.mkdirs()
+    csvReporter.start(pollingPeriodSecs, TimeUnit.SECONDS)
+    info("Started metrics CSV reporter with polling period %d seconds".format(pollingPeriodSecs))
+  }
+
+  def startGraphiteReporter(pollingPeriodSecs: Long) {
+    graphiteReporter.start(pollingPeriodSecs, TimeUnit.SECONDS)
+    info("Started metrics graphite reporter with polling period %d seconds".format(pollingPeriodSecs))
+  }
+
+  def startGangliaReporter(pollingPeriodSecs: Long) {
+    gangliaReporter.start(pollingPeriodSecs, TimeUnit.SECONDS)
+    info("Started metrics ganglia reporter with polling period %d seconds".format(pollingPeriodSecs))
+  }
+
+  def stopCsvReporter() {
+    csvReporter.shutdown()
+    info("Stopped metrics CSV reporter")
+  }
+
+  def stopGraphiteReporter() {
+    graphiteReporter.shutdown()
+    info("Stopped metrics graphite reporter")
+  }
+
+  def stopGangliaReporter() {
+    gangliaReporter.shutdown()
+    info("Stopped metrics ganglia reporter")
+  }
+}
+
+trait KafkaMetricsMBean {
+  def startCsvReporter(pollingPeriodInSeconds: Long)
+  def startGraphiteReporter(pollingPeriodInSeconds: Long)
+  def startGangliaReporter(pollingPeriodInSeconds: Long)
+
+  def stopCsvReporter()
+  def stopGraphiteReporter()
+  def stopGangliaReporter()
+}
+
diff --git core/src/main/scala/kafka/metrics/KafkaMetricsConfigShared.scala core/src/main/scala/kafka/metrics/KafkaMetricsConfigShared.scala
new file mode 100644
index 0000000..1537704
--- /dev/null
+++ core/src/main/scala/kafka/metrics/KafkaMetricsConfigShared.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.metrics
+
+import java.util.Properties
+import kafka.utils.Utils
+
+trait KafkaMetricsConfigShared {
+
+  def kafkaMetricsConfigProps: Properties
+
+  val metricsPollInterval = Utils.getInt(kafkaMetricsConfigProps, "metrics.poll.interval.secs", 60)
+
+  val graphiteCarbonHost = Utils.getString(kafkaMetricsConfigProps, "metrics.graphite.carbon.host", "127.0.0.1")
+  val graphiteCarbonPort = Utils.getInt(kafkaMetricsConfigProps, "metrics.graphite.carbon.port", 2003)
+  val graphiteReporterEnabledAtStartUp = Utils.getBoolean(
+    kafkaMetricsConfigProps, "metrics.graphite.enabled.at.startup", default = false)
+
+  val gangliaHost = Utils.getString(kafkaMetricsConfigProps, "metrics.ganglia.host", "127.0.0.1")
+  val gangliaPort = Utils.getInt(kafkaMetricsConfigProps, "metrics.ganglia.port", 8649)
+  val gangliaReporterEnabledAtStartUp = Utils.getBoolean(
+    kafkaMetricsConfigProps, "metrics.ganglia.enabled.at.startup", default = false)
+
+  val csvReporterDir = Utils.getString(kafkaMetricsConfigProps, "metrics.csv.dir", "kafka_metrics")
+  val csvReporterEnabledAtStartUp = Utils.getBoolean(
+    kafkaMetricsConfigProps, "metrics.csv.enabled.at.startup", default = false)
+}
diff --git core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
new file mode 100644
index 0000000..19f640b
--- /dev/null
+++ core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.metrics
+
+
+import com.yammer.metrics.core.{Gauge, MetricName}
+import kafka.utils.Logging
+import java.util.concurrent.TimeUnit
+import com.yammer.metrics.Metrics
+
+
+trait KafkaMetricsGroup extends Logging {
+
+  def metricsGroupIdent: String
+
+  private def metricName(name: String) = {
+    val ident = metricsGroupIdent
+    val klass = this.getClass
+    val pkg = {
+      val actualPkg = if (klass.getPackage == null) "" else klass.getPackage.getName
+      if (ident.nonEmpty) {
+        if (actualPkg.contains("."))
+          actualPkg.replaceFirst("""\.""", ".%s.".format(ident))
+        else
+          actualPkg + "." + ident
+      }
+      else
+        actualPkg
+    }
+    val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
+    new MetricName(pkg, simpleName, name)
+  }
+
+  def newGauge[T](name: String, metric: Gauge[T]) =
+    Metrics.newGauge(metricName(name), metric)
+
+  def newMeter(name: String, eventType: String, timeUnit: TimeUnit) =
+    Metrics.newMeter(metricName(name), eventType, timeUnit)
+
+  def newHistogram(name: String) = Metrics.newHistogram(metricName(name))
+
+}
diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala
index 22ddec8..1d773c2 100644
--- core/src/main/scala/kafka/server/KafkaApis.scala
+++ core/src/main/scala/kafka/server/KafkaApis.scala
@@ -25,14 +25,17 @@ import kafka.common._
 import kafka.log._
 import kafka.message._
 import kafka.network._
+import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
 import mutable.HashMap
 import scala.math._
 import kafka.network.RequestChannel.Response
-import kafka.utils.{ZkUtils, SystemTime, Logging}
+import java.util.concurrent.TimeUnit
+import kafka.metrics.KafkaMetricsGroup
 import kafka.cluster.Replica
 
+
 /**
  * Logic to handle the various Kafka requests
  */
@@ -44,10 +47,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
                 becomeFollower: (Replica, LeaderAndISR) => Short,
                 brokerId: Int) extends Logging {
 
-  private val produceRequestPurgatory = new ProducerRequestPurgatory(brokerId)
-  private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
+  private val metricsGroup = replicaManager.config.brokerId.toString
+  private val producerRequestPurgatory = new ProducerRequestPurgatory
+  private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
+  private val delayedRequestMetrics = new DelayedRequestMetrics
+
   private val requestLogger = Logger.getLogger("kafka.request.logger")
-  this.logIdent = "KafkaApi on Broker " + brokerId + ", "
+  this.logIdent = "KafkaApis-%d ".format(brokerId)
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -133,12 +139,17 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
   def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) {
     var satisfied = new mutable.ArrayBuffer[DelayedFetch]
     for(partitionData <- partitionDatas)
-      satisfied ++= fetchRequestPurgatory.update((topic, partitionData.partition), partitionData)
-    trace("produce request to %s unblocked %d DelayedFetchRequests.".format(topic, satisfied.size))
+      satisfied ++= fetchRequestPurgatory.update(FetchRequestKey(topic, partitionData.partition), null)
+    trace("Produce request to %s unblocked %d DelayedFetchRequests.".format(topic, satisfied.size))
     // send any newly unblocked responses
     for(fetchReq <- satisfied) {
       val topicData = readMessageSets(fetchReq.fetch)
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
+
+      val fromFollower = fetchReq.fetch.replicaId != FetchRequest.NonFollowerId
+      delayedRequestMetrics.recordDelayedFetchSatisfied(
+        fromFollower, SystemTime.nanoseconds - fetchReq.creationTimeNs, response)
+
       requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
     }
   }
@@ -150,43 +161,45 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
     val produceRequest = ProducerRequest.readFrom(request.request.buffer)
     val sTime = SystemTime.milliseconds
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("producer request %s".format(produceRequest.toString))
-    trace("Broker %s received produce request %s".format(logManager.config.brokerId, produceRequest.toString))
+      requestLogger.trace("Producer request %s".format(produceRequest.toString))
 
     val response = produceToLocalLog(produceRequest)
-    debug("produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+    debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
 
+    for (topicData <- produceRequest.data)
+      maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray)
+    
     if (produceRequest.requiredAcks == 0 ||
         produceRequest.requiredAcks == 1 ||
         produceRequest.data.size <= 0) {
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
-
-      for (topicData <- produceRequest.data)
-        maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray)
     }
     else {
       // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val topicPartitionPairs = produceRequest.data.flatMap(topicData => {
+      val producerRequestKeys = produceRequest.data.flatMap(topicData => {
         val topic = topicData.topic
         topicData.partitionDataArray.map(partitionData => {
-          (topic, partitionData.partition)
+          ProducerRequestKey(topic, partitionData.partition)
         })
       })
+
       val delayedProduce = new DelayedProduce(
-        topicPartitionPairs, request,
+        producerRequestKeys, request,
         response.errors, response.offsets,
         produceRequest, produceRequest.ackTimeoutMs.toLong)
-      produceRequestPurgatory.watch(delayedProduce)
+      producerRequestPurgatory.watch(delayedProduce)
+
       /*
        * Replica fetch requests may have arrived (and potentially satisfied)
-       * delayedProduce requests before they even made it to the purgatory.
+       * delayedProduce requests while they were being added to the purgatory.
        * Here, we explicitly check if any of them can be satisfied.
        */
       var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
-      topicPartitionPairs.foreach(topicPartition =>
-                                    satisfiedProduceRequests ++=
-                                            produceRequestPurgatory.update(topicPartition, topicPartition))
-      debug("%d DelayedProduce requests unblocked after produce to local log.".format(satisfiedProduceRequests.size))
+      producerRequestKeys.foreach(key =>
+        satisfiedProduceRequests ++=
+          producerRequestPurgatory.update(key, (key.topic, key.partition)))
+      debug(satisfiedProduceRequests.size +
+        " DelayedProduce requests unblocked during produce to local log.")
       satisfiedProduceRequests.foreach(_.respond())
     }
   }
@@ -212,12 +225,12 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
           replicaManager.recordLeaderLogEndOffset(topicData.topic, partitionData.partition, log.logEndOffset)
           offsets(msgIndex) = log.logEndOffset
           errors(msgIndex) = ErrorMapping.NoError.toShort
-          trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
+          trace("%d bytes written to logs, nextAppendOffset = %d".format(partitionData.messages.sizeInBytes, offsets(msgIndex)))
         } catch {
           case e =>
             BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest
             BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
-            error("error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e)
+            error("Error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e)
             e match {
               case _: IOException =>
                 fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
@@ -229,8 +242,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
         }
       }
     }
-    val ret = new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
-    ret
+    new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
   }
 
   /**
@@ -238,7 +250,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
    */
   def handleFetchRequest(request: RequestChannel.Request) {
     val fetchRequest = FetchRequest.readFrom(request.request.buffer)
-    trace("handling fetch request: " + fetchRequest.toString)
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Handling fetch request: " + fetchRequest.toString)
     // validate the request
     try {
       fetchRequest.validate()
@@ -255,12 +268,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
       var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
       fetchRequest.offsetInfo.foreach(topicOffsetInfo => {
         topicOffsetInfo.partitions.foreach(partition => {
-          satisfiedProduceRequests ++= produceRequestPurgatory.update(
-            (topicOffsetInfo.topic, partition), (topicOffsetInfo.topic, partition)
+          satisfiedProduceRequests ++= producerRequestPurgatory.update(
+            ProducerRequestKey(topicOffsetInfo.topic, partition),
+            (topicOffsetInfo.topic, partition)
           )
         })
       })
-      debug("replica %d fetch unblocked %d DelayedProduce requests.".format(fetchRequest.replicaId, satisfiedProduceRequests.size))
+      debug("Replica %d fetch unblocked %d DelayedProduce requests.".format(fetchRequest.replicaId, satisfiedProduceRequests.size))
       satisfiedProduceRequests.foreach(_.respond())
     }
 
@@ -270,14 +284,15 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
        availableBytes >= fetchRequest.minBytes ||
        fetchRequest.numPartitions <= 0) {
       val topicData = readMessageSets(fetchRequest)
-      debug("returning fetch response %s for fetch request with correlation id %d".format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
+      debug("Returning fetch response %s for fetch request with correlation id %d"
+        .format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
     } else {
-      debug("putting fetch request into purgatory")
+      debug("Putting fetch request into purgatory")
       // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val topicPartitionPairs: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
-      val delayedFetch = new DelayedFetch(topicPartitionPairs, request, fetchRequest, fetchRequest.maxWait, availableBytes)
+      val delayedFetchKeys = fetchRequest.offsetInfo.flatMap(o => o.partitions.map(FetchRequestKey(o.topic, _)))
+      val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait)
       fetchRequestPurgatory.watch(delayedFetch)
     }
   }
@@ -298,10 +313,11 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
           totalBytes += math.min(offsetDetail.fetchSizes(i), available)
         } catch {
           case e: InvalidPartitionException =>
-            info("invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
+            info("Invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
         }
       }
     }
+    trace(totalBytes + " available bytes for fetch request.")
     totalBytes
   }
 
@@ -342,18 +358,18 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
           case Right(messages) =>
             BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
             BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
-            val leaderReplicaOpt = replicaManager.getReplica(topic, partition, logManager.config.brokerId)
-            assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(logManager.config.brokerId))
+            val leaderReplicaOpt = replicaManager.getReplica(topic, partition, brokerId)
+            assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(brokerId))
             val leaderReplica = leaderReplicaOpt.get
             fetchRequest.replicaId match {
               case FetchRequest.NonFollowerId => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
                 new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
               case _ => // fetch request from a follower
                 val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId)
-                assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, replicaManager.config.brokerId))
+                assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, brokerId))
                 val replica = replicaOpt.get
-                debug("leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]".format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
-                debug("Leader %d returning %d messages for topic %s partition %d to follower %d".format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                debug("leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]".format(brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                debug("Leader %d returning %d messages for topic %s partition %d to follower %d".format(brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
                 new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
             }
         }
@@ -389,7 +405,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
   def handleOffsetRequest(request: RequestChannel.Request) {
     val offsetRequest = OffsetRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("offset request " + offsetRequest.toString)
+      requestLogger.trace("Offset request " + offsetRequest.toString)
     var response: OffsetResponse = null
     try {
       kafkaZookeeper.ensurePartitionLeaderOnThisBroker(offsetRequest.topic, offsetRequest.partition)
@@ -412,11 +428,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
-      requestLogger.trace("topic metadata request " + metadataRequest.toString())
+      requestLogger.trace("Topic metadata request " + metadataRequest.toString())
+
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val zkClient = kafkaZookeeper.getZookeeperClient
     var errorCode = ErrorMapping.NoError
     val config = logManager.config
+
     try {
       val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
       metadataRequest.topics.zip(topicMetadataList).foreach(
@@ -452,33 +470,42 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
   }
 
   def close() {
-    debug("shut down")
+    debug("Shutting down.")
     fetchRequestPurgatory.shutdown()
-    produceRequestPurgatory.shutdown()
-    debug("shutted down completely")
+    producerRequestPurgatory.shutdown()
+    debug("Shut down complete.")
+  }
+
+  private [kafka] trait MetricKey {
+    def keyLabel: String
+  }
+  private [kafka] object MetricKey {
+    val globalLabel = "all"
   }
 
+  private [kafka] case class FetchRequestKey(topic: String, partition: Int)
+          extends MetricKey {
+    override def keyLabel = "%s-%d".format(topic, partition)
+  }
   /**
    * A delayed fetch request
    */
-  class DelayedFetch(keys: Seq[Any], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) extends DelayedRequest(keys, request, delayMs) {
-    val bytesAccumulated = new AtomicLong(initialSize)
-  }
+  class DelayedFetch(keys: Seq[FetchRequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long) extends DelayedRequest(keys, request, delayMs)
 
   /**
    * A holding pen for fetch requests waiting to be satisfied
    */
-  class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData]("Fetch Request Purgatory on Broker " + brokerId + ", ") {
+  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
+
+    this.logIdent = "FetchRequestPurgatory-%d ".format(brokerId)
+
+    override def metricsGroupIdent = metricsGroup
 
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
      */
-    def checkSatisfied(partitionData: PartitionData, delayedFetch: DelayedFetch): Boolean = {
-      val messageDataSize = partitionData.messages.sizeInBytes
-      val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize)
-      debug("fetch request check, accm size: " + accumulatedSize + " delay fetch min bytes: " + delayedFetch.fetch.minBytes)
-      accumulatedSize >= delayedFetch.fetch.minBytes
-    }
+    def checkSatisfied(n: Null, delayedFetch: DelayedFetch): Boolean =
+      availableFetchBytes(delayedFetch.fetch) >= delayedFetch.fetch.minBytes
 
     /**
      * When a request expires just answer it with whatever data is present
@@ -486,11 +513,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
     def expire(delayed: DelayedFetch) {
       val topicData = readMessageSets(delayed.fetch)
       val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
+      val fromFollower = delayed.fetch.replicaId != FetchRequest.NonFollowerId
+      delayedRequestMetrics.recordDelayedFetchExpired(fromFollower, response)
       requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
     }
   }
 
-  class DelayedProduce(keys: Seq[Any],
+  class DelayedProduce(keys: Seq[ProducerRequestKey],
                        request: RequestChannel.Request,
                        localErrors: Array[Short],
                        requiredOffsets: Array[Long],
@@ -504,7 +533,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
      * values are effectively synchronized by the ProducerRequestPurgatory's
      * update method
      */
-    private val partitionStatus = keys.map(key => {
+    private [kafka] val partitionStatus = keys.map(key => {
       val keyIndex = keys.indexOf(key)
       // if there was an error in writing to the local replica's log, then don't
       // wait for acks on this partition
@@ -525,13 +554,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
 
     def respond() {
       val errorsAndOffsets: (List[Short], List[Long]) = (
-              keys.foldRight
-                      ((List[Short](), List[Long]()))
-                      ((key: Any, result: (List[Short], List[Long])) => {
-                        val status = partitionStatus(key)
-                        (status.error :: result._1, status.requiredOffset :: result._2)
-                      })
-              )
+        keys.foldRight
+          ((List[Short](), List[Long]()))
+          ((key: ProducerRequestKey, result: (List[Short], List[Long])) => {
+            val status = partitionStatus(key)
+            (status.error :: result._1, status.requiredOffset :: result._2)
+          })
+        )
       val response = new ProducerResponse(produce.versionId, produce.correlationId,
                                           errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
 
@@ -552,7 +581,9 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
      */
     def isSatisfied(followerFetchPartition: (String, Int)) = {
       val (topic, partitionId) = followerFetchPartition
-      val fetchPartitionStatus = partitionStatus(followerFetchPartition)
+      val key = ProducerRequestKey(topic, partitionId)
+      val fetchPartitionStatus = partitionStatus(key)
+      trace("Checking DelayedProduce satisfaction for %s-%d, acksPending = %b".format(topic, partitionId, fetchPartitionStatus.acksPending))
       if (fetchPartitionStatus.acksPending) {
         val leaderReplica = replicaManager.getLeaderReplica(topic, partitionId)
         leaderReplica match {
@@ -560,14 +591,16 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
             if (leader.isLocal) {
               val isr = leader.partition.inSyncReplicas
               val numAcks = isr.count(r => {
-                if (!r.isLocal)
-                  r.logEndOffset() >= partitionStatus(followerFetchPartition).requiredOffset
+                if (!r.isLocal) {
+                  r.logEndOffset() >= partitionStatus(key).requiredOffset
+                }
                 else
                   true /* also count the local (leader) replica */
               })
-              trace("Received %d/%d acks for produce request to %s-%d".format(
+
+              trace("Received %d/%d acks for produce request to %s-%d; isr size = %d".format(
                 numAcks, produce.requiredAcks,
-                topic, partitionId))
+                topic, partitionId, isr.size))
               if ((produce.requiredAcks < 0 && numAcks >= isr.size) ||
                       (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
                 /*
@@ -575,12 +608,16 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
                  * are fully caught up to the (local) leader's offset
                  * corresponding to this produce request.
                  */
+
                 fetchPartitionStatus.acksPending = false
                 fetchPartitionStatus.error = ErrorMapping.NoError
                 val topicData =
                   produce.data.find(_.topic == topic).get
                 val partitionData =
                   topicData.partitionDataArray.find(_.partition == partitionId).get
+                delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key,
+                                                                       SystemTime.nanoseconds - creationTimeNs,
+                                                                       partitionData.sizeInBytes)
                 maybeUnblockDelayedFetchRequests(
                   topic, Array(partitionData))
               }
@@ -597,7 +634,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
       }
 
       // unblocked if there are no partitions with pending acks
-      ! partitionStatus.exists(p => p._2.acksPending)
+      val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
+      satisfied
     }
 
     class PartitionStatus(var acksPending: Boolean,
@@ -615,21 +653,151 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
     }
   }
 
+  private [kafka] case class ProducerRequestKey(topic: String, partition: Int)
+          extends MetricKey {
+    def keyLabel = "%s-%d".format(topic, partition)
+  }
   /**
    * A holding pen for produce requests waiting to be satisfied.
    */
-  private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce, (String, Int)]("Producer Request Purgatory on Broker " + brokerId + ", ") {
+  private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, (String, Int)](brokerId) {
+
+
+    this.logIdent = "ProducerRequestPurgatory-%d ".format(brokerId)
+
+    override def metricsGroupIdent = metricsGroup
 
-    protected def checkSatisfied(fetchRequestPartition: (String, Int),
+    protected def checkSatisfied(followerFetchPartition: (String, Int),
                                  delayedProduce: DelayedProduce) =
-      delayedProduce.isSatisfied(fetchRequestPartition)
+      delayedProduce.isSatisfied(followerFetchPartition)
 
     /**
      * Handle an expired delayed request
      */
     protected def expire(delayedProduce: DelayedProduce) {
+      for (partitionStatus <- delayedProduce.partitionStatus if partitionStatus._2.acksPending)
+        delayedRequestMetrics.recordDelayedProducerKeyExpired(partitionStatus._1)
+
       delayedProduce.respond()
     }
   }
+
+  private class DelayedRequestMetrics {
+
+
+    private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
+      override def metricsGroupIdent = metricsGroup
+      val caughtUpFollowerFetchRequestMeter =
+        newMeter("CaughtUpFollowerFetchRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
+      val followerCatchUpTimeHistogram = newHistogram("FollowerCatchUpTimeInNs-" + keyLabel)
+      /*
+       * Note that throughput is updated on individual key satisfaction.
+       * Therefore, it is an upper bound on throughput since the
+       * DelayedProducerRequest may get expired.
+       */
+      val throughputMeter = newMeter("Throughput-" + keyLabel, "bytes", TimeUnit.SECONDS)
+      val expiredRequestMeter = newMeter("ExpiredRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
+    }
+
+
+    private class DelayedFetchRequestMetrics(forFollower: Boolean,
+                                             keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
+      private val metricPrefix = if (forFollower) "Follower" else "NonFollower"
+
+      override def metricsGroupIdent = metricsGroup
+      val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel)
+        Some(newMeter("%s-SatisfiedRequestsPerSecond-%s".format(metricPrefix, keyLabel),
+          "requests", TimeUnit.SECONDS))
+      else None
+
+      val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel)
+        Some(newHistogram("%s-SatisfactionTimeInNs-%s".format(metricPrefix, keyLabel)))
+      else None
+
+      val expiredRequestMeter = if (keyLabel == MetricKey.globalLabel)
+        Some(newMeter("%s-ExpiredRequestsPerSecond-%s".format(metricPrefix, keyLabel),
+          "requests", TimeUnit.SECONDS))
+      else None
+
+      val throughputMeter = newMeter("%s-Throughput-%s".format(metricPrefix, keyLabel),
+        "bytes", TimeUnit.SECONDS)
+    }
+
+
+    private val producerRequestMetricsForKey =
+      new Pool[MetricKey, DelayedProducerRequestMetrics]
+    private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics
+
+    private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
+    private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
+
+    private val followerFetchRequestMetricsForKey =
+      new Pool[MetricKey, DelayedFetchRequestMetrics]
+    private val nonFollowerFetchRequestMetricsForKey =
+      new Pool[MetricKey, DelayedFetchRequestMetrics]
+
+
+    def recordDelayedProducerKeyExpired(key: MetricKey) {
+      val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(
+        key, new DelayedProducerRequestMetrics(key.keyLabel))
+      List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
+    }
+
+
+    def recordDelayedProducerKeyCaughtUp(key: MetricKey, timeToCatchUpNs: Long, bytes: Int) {
+      val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(
+        key, new DelayedProducerRequestMetrics(key.keyLabel))
+      List(keyMetrics, aggregateProduceRequestMetrics).foreach(m => {
+        m.caughtUpFollowerFetchRequestMeter.mark()
+        m.followerCatchUpTimeHistogram.update(timeToCatchUpNs)
+        m.throughputMeter.mark(bytes)
+      })
+    }
+
+
+    private def recordDelayedFetchThroughput(forFollower: Boolean, response: FetchResponse) {
+      val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
+        else aggregateNonFollowerFetchRequestMetrics
+      metrics.throughputMeter.mark(response.sizeInBytes)
+
+      response.topicMap.foreach(topicAndData => {
+        val topic = topicAndData._1
+        topicAndData._2.partitionDataArray.foreach(partitionData => {
+          val key = FetchRequestKey(topic, partitionData.partition)
+          val newMetrics = new DelayedFetchRequestMetrics(forFollower, key.keyLabel)
+          val keyMetrics = if (forFollower)
+            followerFetchRequestMetricsForKey.getAndMaybePut(key, newMetrics)
+          else
+            nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key, newMetrics)
+          keyMetrics.throughputMeter.mark(partitionData.sizeInBytes)
+        })
+      })
+    }
+
+
+    def recordDelayedFetchExpired(forFollower: Boolean, response: FetchResponse) {
+      val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
+        else aggregateNonFollowerFetchRequestMetrics
+      if (metrics.expiredRequestMeter.isDefined)
+        metrics.expiredRequestMeter.get.mark()
+
+      recordDelayedFetchThroughput(forFollower, response)
+    }
+
+
+    def recordDelayedFetchSatisfied(forFollower: Boolean, timeToSatisfyNs: Long, response: FetchResponse) {
+      val aggregateMetrics = if (forFollower) aggregateFollowerFetchRequestMetrics
+        else aggregateNonFollowerFetchRequestMetrics
+
+
+      if (aggregateMetrics.satisfactionTimeHistogram.isDefined)
+        aggregateMetrics.satisfactionTimeHistogram.get.update(timeToSatisfyNs)
+
+      if (aggregateMetrics.satisfiedRequestMeter.isDefined)
+        aggregateMetrics.satisfiedRequestMeter.get.mark()
+
+      recordDelayedFetchThroughput(forFollower, response)
+    }
+  }
 }
 
diff --git core/src/main/scala/kafka/server/KafkaConfig.scala core/src/main/scala/kafka/server/KafkaConfig.scala
index c8b7dc3..6379aca 100644
--- core/src/main/scala/kafka/server/KafkaConfig.scala
+++ core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -21,6 +21,8 @@ import java.util.Properties
 import kafka.utils.{Utils, ZKConfig}
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
+import kafka.metrics.KafkaMetricsConfigShared
+
 
 /**
  * Configuration settings for the kafka server
@@ -33,7 +35,7 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {
   val hostName: String = Utils.getString(props, "hostname", null)
 
   /* the broker id for this server */
-  val brokerId: Int = Utils.getInt(props, "brokerid")
+  val brokerId: Int = Utils.getInt(props, "brokerid", 0)
   
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)
@@ -136,7 +138,7 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {
   val replicaMaxWaitTimeMs = Utils.getInt(props, "replica.fetch.wait.time.ms", 500)
 
   /** minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
-  val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086)
+  val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4096)
 
   /* number of fetcher threads used to replicate messages from a source broker.
   *  Increasing this value can increase the degree of I/O parallelism in the follower broker. */
diff --git core/src/main/scala/kafka/server/RequestPurgatory.scala core/src/main/scala/kafka/server/RequestPurgatory.scala
index 83efe53..3376c7f 100644
--- core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -23,6 +23,9 @@ import java.util.concurrent._
 import java.util.concurrent.atomic._
 import kafka.network._
 import kafka.utils._
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.KafkaMetricsGroup
+
 
 /**
  * A request whose processing needs to be delayed for at most the given delayMs
@@ -30,6 +33,7 @@ import kafka.utils._
  * for example a key could be a (topic, partition) pair.
  */
 class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) {
+  val creationTimeNs = SystemTime.nanoseconds
   val satisfied = new AtomicBoolean(false)
 }
 
@@ -58,13 +62,41 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
  * this function handles delayed requests that have hit their time limit without being satisfied.
  *
  */
-abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) extends  Logging{
-  this.logIdent = logPrefix
+abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) extends Logging with KafkaMetricsGroup {
+
   /* a list of requests watching each key */
-  private val watchersForKey = new ConcurrentHashMap[Any, Watchers]
+  private val watchersForKey = new Pool[Any, Watchers]
+
+  private val numDelayedRequestsBeanName = "NumDelayedRequests"
+  private val timeToSatisfyHistogramBeanName = "TimeToSatisfyInNs"
+  private val satisfactionRateBeanName = "SatisfactionRate"
+  private val expirationRateBeanName = "ExpirationRate"
+
+  override def metricsGroupIdent = ""
+
+  val satisfactionRateMeter = newMeter(
+      satisfactionRateBeanName,
+      "requests",
+      TimeUnit.SECONDS
+    )
+
+  val timeToSatisfyHistogram = newHistogram(timeToSatisfyHistogramBeanName)
+
+  newGauge(
+    numDelayedRequestsBeanName,
+    new Gauge[Int] {
+      def value() = expiredRequestReaper.unsatisfied.get()
+    }
+  )
+
+  val expirationRateMeter = newMeter(
+    expirationRateBeanName,
+    "requests",
+    TimeUnit.SECONDS
+  )
 
   /* background thread expiring requests that have been waiting too long */
-  private val expiredRequestReaper = new ExpiredRequestReaper(logPrefix)
+  private val expiredRequestReaper = new ExpiredRequestReaper
   private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
   expirationThread.start()
 
@@ -89,16 +121,9 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten
     else
       w.collectSatisfiedRequests(request)
   }
-
-  private def watchersFor(key: Any): Watchers = {
-    var lst = watchersForKey.get(key)
-    if(lst == null) {
-      watchersForKey.putIfAbsent(key, new Watchers)
-      lst = watchersForKey.get(key)
-    }
-    lst
-  }
-
+  
+  private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key, new Watchers)
+  
   /**
    * Check if this request satisfied this delayed request
    */
@@ -117,7 +142,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten
   }
 
   /**
-   * A linked list of DelayedRequests watching some key with some associated bookeeping logic
+   * A linked list of DelayedRequests watching some key with some associated
+   * bookkeeping logic.
    */
   private class Watchers {
 
@@ -132,10 +158,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten
 
     def add(t: T) {
       synchronized {
-                     requests.add(t)
-                     liveCount += 1
-                     maybePurge()
-                   }
+        requests.add(t)
+        liveCount += 1
+        maybePurge()
+      }
     }
 
     private def maybePurge() {
@@ -151,32 +177,39 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten
 
     def decLiveCount() {
       synchronized {
-                     liveCount -= 1
-                   }
+        liveCount -= 1
+      }
     }
 
     def collectSatisfiedRequests(request: R): Seq[T] = {
       val response = new mutable.ArrayBuffer[T]
       synchronized {
-                     val iter = requests.iterator()
-                     while(iter.hasNext) {
-                       val curr = iter.next
-                       if(curr.satisfied.get) {
-                         // another thread has satisfied this request, remove it
-                         iter.remove()
-                       } else {
-                         if(checkSatisfied(request, curr)) {
-                           iter.remove()
-                           val updated = curr.satisfied.compareAndSet(false, true)
-                           if(updated == true) {
-                             response += curr
-                             liveCount -= 1
-                             expiredRequestReaper.satisfyRequest()
-                           }
-                         }
-                       }
-                     }
-                   }
+        val iter = requests.iterator()
+        while(iter.hasNext) {
+          val curr = iter.next
+          if(curr.satisfied.get) {
+            // another thread has satisfied this request, remove it
+            iter.remove()
+          } else {
+            // synchronize on curr to avoid any race condition with expire
+            // on client-side.
+            val satisfied = curr synchronized checkSatisfied(request, curr)
+            if(satisfied) {
+              iter.remove()
+              val updated = curr.satisfied.compareAndSet(false, true)
+              if(updated == true) {
+                val requestNs = SystemTime.nanoseconds - curr.creationTimeNs
+                satisfactionRateMeter.mark()
+                timeToSatisfyHistogram.update(requestNs)
+
+                response += curr
+                liveCount -= 1
+                expiredRequestReaper.satisfyRequest()
+              }
+            }
+          }
+        }
+      }
       response
     }
   }
@@ -184,8 +217,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten
   /**
    * Runnable to expire requests that have sat unfullfilled past their deadline
    */
-  private class ExpiredRequestReaper(logPrefix: String) extends Runnable with Logging {
-    this.logIdent = "ExpiredRequestReaper for " + logPrefix
+  private class ExpiredRequestReaper extends Runnable with Logging {
+    this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId)
 
     /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
     private val CleanupThresholdSize = 100
@@ -196,14 +229,16 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten
     private val shutdownLatch = new CountDownLatch(1)
     private val needsPurge = new AtomicBoolean(false)
     /* The count of elements in the delay queue that are unsatisfied */
-    private val unsatisfied = new AtomicInteger(0)
+    private [kafka] val unsatisfied = new AtomicInteger(0)
 
     /** Main loop for the expiry thread */
     def run() {
       while(running.get) {
         try {
           val curr = pollExpired()
-          expire(curr)
+          curr synchronized {
+            expire(curr)
+          }
         } catch {
           case ie: InterruptedException =>
             if(needsPurge.getAndSet(false)) {
@@ -232,11 +267,11 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten
 
     /** Shutdown the expiry thread*/
     def shutdown() {
-      debug("shutting down")
+      debug("Shutting down.")
       running.set(false)
       expirationThread.interrupt()
       shutdownLatch.await()
-      debug("shut down completely")
+      debug("Shut down complete.")
     }
 
     /** Record the fact that we satisfied a request in the stats for the expiry queue */
@@ -250,6 +285,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten
         val curr = delayed.take()
         val updated = curr.satisfied.compareAndSet(false, true)
         if(updated) {
+          expirationRateMeter.mark()
           unsatisfied.getAndDecrement()
           for(key <- curr.keys)
             watchersFor(key).decLiveCount()
diff --git core/src/main/scala/kafka/tools/TestMetrics.scala core/src/main/scala/kafka/tools/TestMetrics.scala
new file mode 100644
index 0000000..ce5f821
--- /dev/null
+++ core/src/main/scala/kafka/tools/TestMetrics.scala
@@ -0,0 +1,32 @@
+package kafka.tools
+
+
+import com.yammer.metrics.Metrics
+
+import util.Random
+import com.yammer.metrics.core.{Gauge, MetricName}
+import java.util.concurrent.atomic.AtomicInteger
+
+
+class TestMetrics
+
+object TestMetrics {
+  def main(args: Array[String]) {
+    val i = new AtomicInteger(0)
+    val testHistogram = Metrics.newHistogram(classOf[TestMetrics], "testHistogram")
+    Metrics.newGauge(TestMetrics.getClass, "testGauge", new Gauge[Int] {
+      def value() = i.get()
+    })
+    Metrics.newGauge(TestMetrics.getClass, "testGauge", new Gauge[Int] {
+      def value() = i.get()
+    })
+
+    val counter = Metrics.newCounter(TestMetrics.getClass, "testCounter")
+
+    while (true) {
+      i.set(Random.nextInt(1000))
+      testHistogram.update(i.get())
+      if (i.get() % 7 == 0) counter.inc()
+    }
+  }
+}
\ No newline at end of file
diff --git core/src/main/scala/kafka/utils/Pool.scala core/src/main/scala/kafka/utils/Pool.scala
index d62fa77..2f0a9a9 100644
--- core/src/main/scala/kafka/utils/Pool.scala
+++ core/src/main/scala/kafka/utils/Pool.scala
@@ -27,14 +27,23 @@ class Pool[K,V] extends Iterable[(K, V)] {
   
   def this(m: collection.Map[K, V]) {
     this()
-    for((k,v) <- m.elements)
-      pool.put(k, v)
+    m.foreach(kv => pool.put(kv._1, kv._2))
   }
   
   def put(k: K, v: V) = pool.put(k, v)
   
   def putIfNotExists(k: K, v: V) = pool.putIfAbsent(k, v)
-  
+
+  def getAndMaybePut(key: K, valueIfAbsent: V) = {
+    val curr = pool.get(key)
+    if (curr == null) {
+      val updatedCurr = pool.putIfAbsent(key, valueIfAbsent)
+      if (updatedCurr == null) valueIfAbsent else updatedCurr
+    }
+    else
+      curr
+  }
+
   def contains(id: K) = pool.containsKey(id)
   
   def get(key: K): V = pool.get(key)
@@ -46,7 +55,7 @@ class Pool[K,V] extends Iterable[(K, V)] {
   def values: Iterable[V] = 
     JavaConversions.asIterable(new ArrayList[V](pool.values()))
   
-  def clear: Unit = pool.clear()
+  def clear() { pool.clear() }
   
   override def size = pool.size
   
diff --git core/src/main/scala/kafka/utils/Utils.scala core/src/main/scala/kafka/utils/Utils.scala
index bdbc6b1..a8b7701 100644
--- core/src/main/scala/kafka/utils/Utils.scala
+++ core/src/main/scala/kafka/utils/Utils.scala
@@ -501,7 +501,7 @@ object Utils extends Logging {
    * instead it just returns false indicating the registration failed.
    * @param mbean The object to register as an mbean
    * @param name The name to register this mbean with
-   * @returns true if the registration succeeded
+   * @return true if the registration succeeded
    */
   def registerMBean(mbean: Object, name: String): Boolean = {
     try {
diff --git core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 66d641a..b99eaa5 100644
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -103,8 +103,10 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
     val logManager = EasyMock.createMock(classOf[LogManager])
     val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
     val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
+    EasyMock.expect(replicaManager.config).andReturn(configs.head)
     EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
     EasyMock.expect(logManager.config).andReturn(configs.head)
+    EasyMock.replay(replicaManager)
     EasyMock.replay(logManager)
     EasyMock.replay(kafkaZookeeper)
 
diff --git core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
index 6237432..a3a506a 100644
--- core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
+++ core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
@@ -18,26 +18,27 @@
 package kafka.server
 
 import scala.collection._
-import org.junit.{After, Before, Test}
+import org.junit.Test
 import junit.framework.Assert._
 import kafka.message._
 import kafka.api._
 import kafka.utils.TestUtils
+import org.scalatest.junit.JUnit3Suite
 
-class RequestPurgatoryTest {
+class RequestPurgatoryTest extends JUnit3Suite {
 
   val producerRequest1 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello1".getBytes)))
   val producerRequest2 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello2".getBytes)))
   var purgatory: MockRequestPurgatory = null
   
-  @Before
-  def setup() {
+  override def setUp() {
+    super.setUp()
     purgatory = new MockRequestPurgatory()
   }
   
-  @After
-  def teardown() {
+  override def tearDown() {
     purgatory.shutdown()
+    super.tearDown()
   }
 
   @Test
@@ -54,7 +55,7 @@ class RequestPurgatoryTest {
     assertEquals("Nothing satisfied", 0, purgatory.update("test1", producerRequest2).size)
     purgatory.satisfied += r2
     assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2", producerRequest2))
-    assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)  
+    assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)
   }
 
   @Test
@@ -73,7 +74,8 @@ class RequestPurgatoryTest {
     assertTrue("Time for expiration was about 20ms", (elapsed - expiration).abs < 10L)
   }
   
-  class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest]("Mock Request Purgatory") {
+  class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {
+    val metricsLabel = "MockRequestPurgatoryMetrics"
     val satisfied = mutable.Set[DelayedRequest]()
     val expired = mutable.Set[DelayedRequest]()
     def awaitExpiration(delayed: DelayedRequest) = {
diff --git perf/src/main/scala/kafka/perf/ProducerPerformance.scala perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index 0e1ff7a..559fd91 100644
--- perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -180,6 +180,7 @@ object ProducerPerformance extends Logging {
     props.put("compression.codec", config.compressionCodec.codec.toString)
     props.put("reconnect.interval", Integer.MAX_VALUE.toString)
     props.put("buffer.size", (64*1024).toString)
+    props.put("producer.request.required.acks", 2.toString)
     if(config.isAsync) {
       props.put("producer.type","async")
       props.put("batch.size", config.batchSize.toString)
diff --git project/build/KafkaProject.scala project/build/KafkaProject.scala
index 5ad1739..396843b 100644
--- project/build/KafkaProject.scala
+++ project/build/KafkaProject.scala
@@ -60,11 +60,11 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
 
     def zkClientDep =
       <dependency>
-       <groupId>zkclient</groupId>
-       <artifactId>zkclient</artifactId>
-       <version>20120522</version>
-       <scope>compile</scope>
-       </dependency>
+        <groupId>zkclient</groupId>
+        <artifactId>zkclient</artifactId>
+        <version>20120522</version>
+        <scope>compile</scope>
+      </dependency>
 
     object ZkClientDepAdder extends RuleTransformer(new RewriteRule() {
       override def transform(node: Node): Seq[Node] = node match {
@@ -251,6 +251,11 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
   trait CoreDependencies {
     val log4j = "log4j" % "log4j" % "1.2.15"
     val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
+    private val metricsVersion = "latest.release"
+    val metricsCore = "com.yammer.metrics" % "metrics-core" % metricsVersion
+    val metricsGraphite = "com.yammer.metrics" % "metrics-graphite" % metricsVersion
+    val metricsGanglia = "com.yammer.metrics" % "metrics-ganglia" % metricsVersion
+    val slf4jSimple = "org.slf4j" % "slf4j-simple" % "latest.release"
   }
   
   trait HadoopDependencies {
@@ -264,5 +269,4 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
   trait CompressionDependencies {
     val snappy = "org.xerial.snappy" % "snappy-java" % "1.0.4.1"	
   }
-
 }
