Index: system_test/broker_failure/bin/run-test.sh
===================================================================
--- system_test/broker_failure/bin/run-test.sh	(revision 1297646)
+++ system_test/broker_failure/bin/run-test.sh	(working copy)
@@ -68,7 +68,7 @@
 
 readonly num_msg_per_batch=500
 readonly batches_per_iteration=5
-readonly num_iterations=1
+readonly num_iterations=20
 
 readonly zk_source_port=2181
 readonly zk_mirror_port=2182
Index: core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala	(revision 1297646)
+++ core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala	(working copy)
@@ -36,6 +36,7 @@
       props.put("buffer.size", "65536")
       props.put("connect.timeout.ms", "100000")
       props.put("reconnect.interval", "10000")
+      props.put("max.message.size", "10000000")
       producer = new SyncProducer(new SyncProducerConfig(props))
       consumer = new SimpleConsumer(host,
                                    port,
Index: core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala	(revision 1297646)
+++ core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala	(working copy)
@@ -33,7 +33,7 @@
  */
 class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with kafka.integration.KafkaServerTestHarness {
   
-  val port = 9999
+  val port = TestUtils.choosePort()
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props) {
                  override val enableZookeeper = false
Index: core/src/main/scala/kafka/utils/Utils.scala
===================================================================
--- core/src/main/scala/kafka/utils/Utils.scala	(revision 1297646)
+++ core/src/main/scala/kafka/utils/Utils.scala	(working copy)
@@ -24,17 +24,18 @@
 import java.lang.management._
 import java.util.zip.CRC32
 import javax.management._
-import java.util.Properties
 import scala.collection._
 import scala.collection.mutable
-import kafka.message.{NoCompressionCodec, CompressionCodec}
 import org.I0Itec.zkclient.ZkClient
+import kafka.message.{Message, NoCompressionCodec, CompressionCodec}
+import java.util.{Random, Properties}
 
 /**
  * Helper functions!
  */
 object Utils extends Logging {
-  
+  val random = new Random
+
   /**
    * Wrap the given function in a java.lang.Runnable
    * @param fun A function
@@ -657,6 +658,18 @@
       case _ => // swallow
     }
   }
+
+  def getMessageOfSize(messageSize: Int): Message = {
+    val messageBytes = getByteArrayOfLength(messageSize)
+    Utils.random.nextBytes(messageBytes)
+    new Message(messageBytes)
+  }
+
+  def getByteArrayOfLength(len: Int): Array[Byte] = {
+    //new Array[Byte](len)
+    new Array[Byte]( if (len == 0) 5 else len )
+  }
+
 }
 
 class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {
Index: core/src/main/scala/kafka/producer/Producer.scala
===================================================================
--- core/src/main/scala/kafka/producer/Producer.scala	(revision 1297646)
+++ core/src/main/scala/kafka/producer/Producer.scala	(working copy)
@@ -22,7 +22,7 @@
 import java.util.Properties
 import kafka.cluster.{Partition, Broker}
 import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}
+import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException}
 import kafka.api.ProducerRequest
 
 class Producer[K,V](config: ProducerConfig,
Index: core/src/main/scala/kafka/message/CompressionUtils.scala
===================================================================
--- core/src/main/scala/kafka/message/CompressionUtils.scala	(revision 1297646)
+++ core/src/main/scala/kafka/message/CompressionUtils.scala	(working copy)
@@ -17,11 +17,9 @@
 
 package kafka.message
 
-import java.io.ByteArrayOutputStream
-import java.io.IOException
-import java.io.InputStream
 import java.nio.ByteBuffer
 import kafka.utils._
+import java.io.{EOFException, ByteArrayOutputStream, IOException, InputStream}
 
 abstract sealed class CompressionFacade(inputStream: InputStream, outputStream: ByteArrayOutputStream) {
   def close() = {
@@ -32,7 +30,8 @@
   def write(a: Array[Byte])
 }
 
-class GZIPCompression(inputStream: InputStream, outputStream: ByteArrayOutputStream)  extends CompressionFacade(inputStream,outputStream) {
+class GZIPCompression(inputStream: InputStream, outputStream: ByteArrayOutputStream)
+  extends CompressionFacade(inputStream,outputStream) with Logging {
   import java.util.zip.GZIPInputStream
   import java.util.zip.GZIPOutputStream
   val gzipIn:GZIPInputStream = if (inputStream == null) null else new  GZIPInputStream(inputStream)
@@ -49,11 +48,19 @@
   }
 
   override def read(a: Array[Byte]): Int = {
-    gzipIn.read(a)
+    var numBytesDecompressed: Int = -1
+    try {
+      numBytesDecompressed = gzipIn.read(a)
+    }catch {
+      case e: EOFException => warn("Ignoring EOFException during decompression", e)
+      numBytesDecompressed = -1
+    }
+    numBytesDecompressed
   }
 }
 
-class SnappyCompression(inputStream: InputStream,outputStream: ByteArrayOutputStream)  extends CompressionFacade(inputStream,outputStream) {
+class SnappyCompression(inputStream: InputStream,outputStream: ByteArrayOutputStream)
+  extends CompressionFacade(inputStream,outputStream) with Logging {
   import org.xerial.snappy.SnappyInputStream
   import org.xerial.snappy.SnappyOutputStream
   
@@ -71,9 +78,15 @@
   }
 
   override def read(a: Array[Byte]): Int = {
-    snappyIn.read(a)	
+    var numBytesDecompressed: Int = -1
+    try {
+      numBytesDecompressed = snappyIn.read(a)
+    }catch {
+      case e: EOFException => warn("Ignoring EOFException during decompression", e)
+      numBytesDecompressed = -1
+    }
+    numBytesDecompressed
   }
-
 }
 
 object CompressionFactory {
Index: perf/src/main/scala/kafka/perf/ProducerPerformance.scala
===================================================================
--- perf/src/main/scala/kafka/perf/ProducerPerformance.scala	(revision 1297646)
+++ perf/src/main/scala/kafka/perf/ProducerPerformance.scala	(working copy)
@@ -24,7 +24,7 @@
 import kafka.message.{CompressionCodec, Message}
 import java.text.SimpleDateFormat
 import java.util.{Random, Properties}
-import kafka.utils.Logging
+import kafka.utils.{Utils, Logging}
 
 /**
  * Load test for the producer
@@ -121,18 +121,6 @@
     val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
   }
 
-  private def getStringOfLength(len: Int) : String = {
-    val strArray = new Array[Char](len)
-    for (i <- 0 until len)
-      strArray(i) = 'x'
-    return new String(strArray)
-  }
-
-  private def getByteArrayOfLength(len: Int): Array[Byte] = {
-    //new Array[Byte](len)
-    new Array[Byte]( if (len == 0) 5 else len )
-  }
-
   class ProducerThread(val threadId: Int,
                        val config: ProducerPerfConfig,
                        val totalBytesSent: AtomicLong,
@@ -181,7 +169,7 @@
         if (!config.isFixSize) {
           for(k <- 0 until config.batchSize) {
             strLength = rand.nextInt(config.messageSize)
-            val message = new Message(getByteArrayOfLength(strLength))
+            val message = new Message(Utils.getByteArrayOfLength(strLength))
             messageSet ::= message
             bytesSent += message.payloadSize
           }
@@ -196,9 +184,7 @@
           }else {
             if(!config.isFixSize) {
               strLength = rand.nextInt(config.messageSize)
-              val messageBytes = getByteArrayOfLength(strLength)
-              rand.nextBytes(messageBytes)
-              val message = new Message(messageBytes)
+              val message = Utils.getMessageOfSize(strLength)
               producer.send(new ProducerData[Message,Message](config.topic, message))
               debug(config.topic + "-checksum:" + message.checksum)
               bytesSent += message.payloadSize
