Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(revision 1157460)
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(working copy)
@@ -35,7 +35,7 @@
  */
 class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness {
   
-  val port = 9999
+  val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props) {
                  override val enableZookeeper = false
@@ -47,7 +47,7 @@
     val topic = "test-topic"
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("broker.list", "0:localhost:9999")
+    props.put("broker.list", "0:localhost:" + port)
     val config = new ProducerConfig(props)
 
     val stringProducer1 = new Producer[String, String](config)
@@ -67,7 +67,7 @@
     val topic = "test-topic"
     val props = new Properties()
     props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("broker.list", "0:localhost:9999")
+    props.put("broker.list", "0:localhost:" + port)
     props.put("compression", "true")
     val config = new ProducerConfig(props)
 
Index: core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala	(revision 1157460)
+++ core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala	(working copy)
@@ -17,7 +17,7 @@
 
 class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness with ZooKeeperTestHarness {
   val zkConnect = TestZKUtils.zookeeperConnect  
-  val port = 9999
+  val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props) {
                  override val hostName = "localhost"
Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(revision 1157460)
+++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(working copy)
@@ -32,7 +32,7 @@
  */
 class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness   {
 
-  val port = 9999
+  val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props) {
                  override val enableZookeeper = false
Index: core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala	(revision 1157460)
+++ core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala	(working copy)
@@ -34,7 +34,7 @@
   val testConsumer = "consumer"
   val kafkaProps = new Properties
   val host = "localhost"
-  val port = 9892
+  val port = TestUtils.choosePort
   val loader = getClass.getClassLoader
   val kafkaLogDir = loader.getResource("test-kafka-logs")
   kafkaProps.put("brokerid", "12")
Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(revision 1157460)
+++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(working copy)
@@ -36,7 +36,7 @@
 
   @Before
   def setUp() {
-    server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, 9092))
+    server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.choosePort))
     {
       override val enableZookeeper = false
     })
@@ -51,7 +51,7 @@
   def testUnreachableServer() {
     val props = new Properties()
     props.put("host", "NOT_USED")
-    props.put("port", "9092")
+    props.put("port", server.socketServer.port.toString)
     props.put("buffer.size", "102400")
     props.put("connect.timeout.ms", "300")
     props.put("reconnect.interval", "1000")
@@ -77,11 +77,10 @@
       producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
     }catch {
       case e: Exception => failed = true
-
     }
     val secondEnd = SystemTime.milliseconds
     println("Second message send retries took " + (secondEnd-secondStart) + " ms")
-    Assert.assertTrue((secondEnd-secondEnd) < 300)
+    Assert.assertTrue((secondEnd-secondStart) < 300)
     simpleProducerLogger.setLevel(Level.ERROR)
   }
 
@@ -89,7 +88,7 @@
   def testReachableServer() {
     val props = new Properties()
     props.put("host", "localhost")
-    props.put("port", "9092")
+    props.put("port", server.socketServer.port.toString)
     props.put("buffer.size", "102400")
     props.put("connect.timeout.ms", "500")
     props.put("reconnect.interval", "1000")
@@ -113,7 +112,7 @@
     }
     Assert.assertFalse(failed)
     val secondEnd = SystemTime.milliseconds
-    Assert.assertTrue((secondEnd-secondEnd) < 500)
+    Assert.assertTrue((secondEnd-secondStart) < 500)
 
     try {
       producer.multiSend(Array(new ProducerRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))))
@@ -127,7 +126,7 @@
   def testReachableServerWrongPort() {
     val props = new Properties()
     props.put("host", "localhost")
-    props.put("port", "9091")
+    props.put("port", (server.socketServer.port + 1).toString) // the wrong port
     props.put("buffer.size", "102400")
     props.put("connect.timeout.ms", "300")
     props.put("reconnect.interval", "500")
@@ -153,7 +152,7 @@
     }
     Assert.assertTrue(failed)
     val secondEnd = SystemTime.milliseconds
-    Assert.assertTrue((secondEnd-secondEnd) < 300)
+    Assert.assertTrue((secondEnd-secondStart) < 300)
     simpleProducerLogger.setLevel(Level.ERROR)
   }
 
@@ -161,7 +160,7 @@
   def testMessageSizeTooLarge() {
     val props = new Properties()
     props.put("host", "localhost")
-    props.put("port", "9091")
+    props.put("port", server.socketServer.port.toString)
     props.put("buffer.size", "102400")
     props.put("connect.timeout.ms", "300")
     props.put("reconnect.interval", "500")
Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(revision 1157460)
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(working copy)
@@ -40,8 +40,8 @@
   private val topic = "test-topic"
   private val brokerId1 = 0
   private val brokerId2 = 1  
-  private val port1 = 9098
-  private val port2 = 9099
+  private val ports = TestUtils.choosePorts(2)
+  private val (port1, port2) = (ports(0), ports(1))
   private var server1: KafkaServer = null
   private var server2: KafkaServer = null
   private var producer1: SyncProducer = null
@@ -605,7 +605,8 @@
     val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
     val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
 
-    val serverProps = TestUtils.createBrokerConfig(2, 9094)
+    val port = TestUtils.choosePort
+    val serverProps = TestUtils.createBrokerConfig(2, port)
     val serverConfig = new KafkaConfig(serverProps) {
       override val numPartitions = 4
     }
@@ -615,7 +616,7 @@
     // send a message to the new broker to register it under topic "test-topic"
     val tempProps = new Properties()
     tempProps.put("host", "localhost")
-    tempProps.put("port", "9094")
+    tempProps.put("port", port.toString)
     val tempProducer = new SyncProducer(new SyncProducerConfig(tempProps))
     tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
                                                              messages = new Message("test".getBytes())))
Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(revision 1157460)
+++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(working copy)
@@ -31,7 +31,7 @@
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 
 class ServerShutdownTest extends JUnitSuite {
-  val port = 9999
+  val port = TestUtils.choosePort
 
   @Test
   def testCleanShutdown() {
