Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(revision 1300670)
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(working copy)
@@ -26,10 +26,10 @@
 import org.apache.log4j.{Level, Logger}
 import org.junit.Test
 import kafka.utils.{TestZKUtils, Utils, TestUtils}
-import kafka.message.Message
 import kafka.admin.CreateTopicCommand
 import kafka.api.FetchRequestBuilder
 import org.junit.Assert._
+import kafka.message.{FileMessageSet, Message}
 
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   private val brokerId1 = 0
@@ -126,8 +126,8 @@
 
   // TODO: Need to rewrite when SyncProducer changes to throw timeout exceptions
   //       and when leader logic is changed.
-  @Test
-  def testZKSendWithDeadBroker() {
+//  @Test
+//  def testZKSendWithDeadBroker() {
 //    val props = new Properties()
 //    props.put("serializer.class", "kafka.serializer.StringEncoder")
 //    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
@@ -172,6 +172,55 @@
 //      case e: Exception => fail("Not expected", e)
 //    }
 //    producer.close
+//  }
+
+  @Test
+   def testZKSendWithDeadBroker() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    // create topic
+    CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 1, "0")
+
+    val config = new ProducerConfig(props)
+
+    val producer = new Producer[String, String](config)
+    val message = new Message("test1".getBytes)
+    try {
+
+      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
+      // all partitions have broker 0 as the leader.
+      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      Thread.sleep(100)
+
+      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      Thread.sleep(3000)
+
+      val log = server1.getLogManager().getLog("new-topic", 0)
+      val fileMessageSet = log.segments.contents.get().head.messageSet
+      var numMessagesWritten = 0
+      for(messageAndOffset <- fileMessageSet) {
+        println("Received message at offset %d".format(messageAndOffset.offset))
+        numMessagesWritten += 1
+      }
+      println("broker %d received %d messages".format(server1.config.brokerId, numMessagesWritten))
+      // cross check if brokers got the messages
+      val response = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      val messageSet = response.messageSet("new-topic", 0).iterator
+      var numMessagesReceived = 0
+      while(messageSet.hasNext) {
+        val messageAndOffset = messageSet.next()
+        assertEquals(message, messageSet.next.message)
+        println("Received message at offset %d".format(messageAndOffset.offset))
+        numMessagesReceived += 1
+      }
+      assertEquals("Message set should have 2 messages", 2, numMessagesReceived)
+    } catch {
+      case e: Exception => fail("Not expected", e)
+    }
+    producer.close
   }
 
   @Test
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1300670)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -112,7 +112,7 @@
   private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
 
   /* The actual segments of the log */
-  private[log] val segments: SegmentList[LogSegment] = loadSegments()
+  val segments: SegmentList[LogSegment] = loadSegments()
 
   /* The name of this log */
   val name  = dir.getName()
