From f9b8b3b7eb22ea0af8a24a4b9fec739965913c29 Mon Sep 17 00:00:00 2001
From: Ewen Cheslack-Postava <me@ewencp.org>
Date: Wed, 29 Oct 2014 13:48:18 -0700
Subject: [PATCH] KAFKA-1501 Ensure tests allocate all ports simultaneously to
 ensure ports won't be reused.

---
 .../kafka/api/ProducerCompressionTest.scala        |  9 +++---
 .../kafka/api/ProducerFailureHandlingTest.scala    |  9 +++---
 .../integration/kafka/api/ProducerSendTest.scala   |  9 +++---
 .../scala/unit/kafka/admin/AddPartitionsTest.scala | 21 +++++++------
 .../test/scala/unit/kafka/admin/AdminTest.scala    | 35 +++++++++++++++-------
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   | 19 ++++++++----
 .../scala/unit/kafka/admin/TopicCommandTest.scala  |  7 ++---
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |  7 ++---
 .../consumer/ZookeeperConsumerConnectorTest.scala  |  7 ++---
 .../kafka/integration/AutoOffsetResetTest.scala    |  8 ++---
 .../scala/unit/kafka/integration/FetcherTest.scala |  7 ++---
 .../kafka/integration/KafkaServerTestHarness.scala |  5 ++--
 .../unit/kafka/integration/PrimitiveApiTest.scala  |  9 +++---
 .../integration/ProducerConsumerTestHarness.scala  |  5 ++--
 .../unit/kafka/integration/RollingBounceTest.scala | 21 +++++++------
 .../unit/kafka/integration/TopicMetadataTest.scala |  7 ++---
 .../integration/UncleanLeaderElectionTest.scala    | 13 ++++----
 .../consumer/ZookeeperConsumerConnectorTest.scala  |  8 ++---
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  3 +-
 .../unit/kafka/log4j/KafkaLog4jAppenderTest.scala  |  9 +++---
 .../unit/kafka/network/SocketServerTest.scala      |  2 +-
 .../unit/kafka/producer/AsyncProducerTest.scala    |  3 +-
 .../scala/unit/kafka/producer/ProducerTest.scala   |  9 +++---
 .../unit/kafka/producer/SyncProducerTest.scala     |  7 ++---
 .../unit/kafka/server/AdvertiseBrokerTest.scala    | 15 +++++-----
 .../kafka/server/DynamicConfigChangeTest.scala     |  5 ++--
 .../server/HighwatermarkPersistenceTest.scala      |  3 +-
 .../unit/kafka/server/ISRExpirationTest.scala      |  3 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 31 +++++++++----------
 .../unit/kafka/server/LeaderElectionTest.scala     | 16 +++++-----
 .../scala/unit/kafka/server/LogOffsetTest.scala    |  3 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |  8 ++---
 .../scala/unit/kafka/server/OffsetCommitTest.scala |  5 ++--
 .../scala/unit/kafka/server/ReplicaFetchTest.scala |  7 ++---
 .../unit/kafka/server/ReplicaManagerTest.scala     |  6 ++--
 .../unit/kafka/server/ServerShutdownTest.scala     | 13 ++++----
 .../unit/kafka/server/ServerStartupTest.scala      | 17 +++++------
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |  3 +-
 .../unit/kafka/utils/NetworkTestHarness.scala      | 29 ++++++++++++++++++
 .../unit/kafka/utils/ReplicationUtilsTest.scala    |  5 ++--
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 21 ++++++-------
 .../test/scala/unit/kafka/zk/ZKEphemeralTest.scala |  6 ++--
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  8 ++---
 43 files changed, 231 insertions(+), 212 deletions(-)
 create mode 100644 core/src/test/scala/unit/kafka/utils/NetworkTestHarness.scala

diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 6379f2b..dd9750e 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -19,7 +19,6 @@ package kafka.api.test
 
 import java.util.{Properties, Collection, ArrayList}
 
-import org.scalatest.junit.JUnit3Suite
 import org.junit.runners.Parameterized
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized.Parameters
@@ -32,18 +31,18 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.consumer.SimpleConsumer
 import kafka.message.Message
 import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.{NetworkTestHarness, Utils, TestUtils}
 
 import scala.Array
 
 
 @RunWith(value = classOf[Parameterized])
-class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness {
+class ProducerCompressionTest(compression: String) extends NetworkTestHarness with ZooKeeperTestHarness {
   private val brokerId = 0
-  private val port = TestUtils.choosePort
+  private val port = getPort()
   private var server: KafkaServer = null
 
-  private val props = TestUtils.createBrokerConfig(brokerId, port)
+  private val props = TestUtils.createBrokerConfig(brokerId, port, zkPort)
   private val config = new KafkaConfig(props)
 
   private val topic = "topic"
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 209a409..d978e6d 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -19,7 +19,6 @@ package kafka.api
 
 import kafka.common.Topic
 import org.apache.kafka.common.errors.{InvalidTopicException,NotEnoughReplicasException}
-import org.scalatest.junit.JUnit3Suite
 import org.junit.Test
 import org.junit.Assert._
 
@@ -28,22 +27,22 @@ import java.lang.Integer
 import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
 
 import kafka.server.KafkaConfig
-import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils}
+import kafka.utils.{NetworkTestHarness, TestZKUtils, ShutdownableThread, TestUtils}
 import kafka.integration.KafkaServerTestHarness
 import kafka.consumer.SimpleConsumer
 
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.clients.producer._
 
-class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarness {
+class ProducerFailureHandlingTest extends NetworkTestHarness with KafkaServerTestHarness {
   private val producerBufferSize = 30000
   private val serverMessageMaxBytes =  producerBufferSize/2
 
   val numServers = 2
   val configs =
-    for(props <- TestUtils.createBrokerConfigs(numServers, false))
+    for(props <- TestUtils.createBrokerConfigs(getPorts(numServers), zkPort, false))
     yield new KafkaConfig(props) {
-      override val zkConnect = TestZKUtils.zookeeperConnect
+      override val zkConnect = ProducerFailureHandlingTest.this.zkConnect
       override val autoCreateTopicsEnable = false
       override val messageMaxBytes = serverMessageMaxBytes
     }
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index d407af9..ea8743a 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -20,24 +20,23 @@ package kafka.api.test
 import java.lang.{Integer, IllegalArgumentException}
 
 import org.apache.kafka.clients.producer._
-import org.scalatest.junit.JUnit3Suite
 import org.junit.Test
 import org.junit.Assert._
 
 import kafka.server.KafkaConfig
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.utils.{NetworkTestHarness, TestZKUtils, TestUtils}
 import kafka.consumer.SimpleConsumer
 import kafka.api.FetchRequestBuilder
 import kafka.message.Message
 import kafka.integration.KafkaServerTestHarness
 
 
-class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
+class ProducerSendTest extends NetworkTestHarness with KafkaServerTestHarness {
   val numServers = 2
   val configs =
-    for(props <- TestUtils.createBrokerConfigs(numServers, false))
+    for(props <- TestUtils.createBrokerConfigs(getPorts(numServers), zkPort, false))
     yield new KafkaConfig(props) {
-      override val zkConnect = TestZKUtils.zookeeperConnect
+      override val zkConnect = ProducerSendTest.this.zkConnect
       override val numPartitions = 4
     }
 
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 1bf2667..959bb3a 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,30 +17,29 @@
 
 package kafka.admin
 
-import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
-import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.utils.{NetworkTestHarness, ZkUtils, Utils, TestUtils}
 import kafka.cluster.Broker
 import kafka.client.ClientUtils
 import kafka.server.{KafkaConfig, KafkaServer}
 
-class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
+class AddPartitionsTest extends NetworkTestHarness with ZooKeeperTestHarness {
   val brokerId1 = 0
   val brokerId2 = 1
   val brokerId3 = 2
   val brokerId4 = 3
 
-  val port1 = TestUtils.choosePort()
-  val port2 = TestUtils.choosePort()
-  val port3 = TestUtils.choosePort()
-  val port4 = TestUtils.choosePort()
+  val port1 = getPort()
+  val port2 = getPort()
+  val port3 = getPort()
+  val port4 = getPort()
 
-  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
-  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
-  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, false)
-  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, false)
+  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, zkPort, false)
+  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, zkPort, false)
+  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, zkPort, false)
+  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, zkPort, false)
 
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
   var brokers: Seq[Broker] = Seq.empty[Broker]
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index e289798..1c494cb 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -18,7 +18,6 @@ package kafka.admin
 
 import junit.framework.Assert._
 import org.junit.Test
-import org.scalatest.junit.JUnit3Suite
 import java.util.Properties
 import kafka.utils._
 import kafka.log._
@@ -30,7 +29,7 @@ import java.io.File
 import TestUtils._
 
 
-class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
+class AdminTest extends NetworkTestHarness with ZooKeeperTestHarness with Logging {
 
   @Test
   def testReplicaAssignment() {
@@ -145,7 +144,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val ports = getPorts(4)
+    val servers = TestUtils.createBrokerConfigs(ports, zkPort, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -169,6 +169,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
     servers.foreach(_.shutdown())
+    freePorts(ports)
   }
 
   @Test
@@ -176,7 +177,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val ports = getPorts(4)
+    val servers = TestUtils.createBrokerConfigs(ports, zkPort, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -200,6 +202,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
                             "New replicas should exist on brokers")
 
     servers.foreach(_.shutdown())
+    freePorts(ports)
   }
 
   @Test
@@ -207,7 +210,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val expectedReplicaAssignment = Map(0  -> List(0, 1))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val ports = getPorts(4)
+    val servers = TestUtils.createBrokerConfigs(ports, zkPort, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -230,13 +234,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
     servers.foreach(_.shutdown())
+    freePorts(ports)
   }
 
   @Test
   def testReassigningNonExistingPartition() {
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val ports = getPorts(4)
+    val servers = TestUtils.createBrokerConfigs(ports, zkPort, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
     // reassign partition 0
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
@@ -246,6 +252,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient)
     assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition))
     servers.foreach(_.shutdown())
+    freePorts(ports)
   }
 
   @Test
@@ -262,7 +269,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
     reassignPartitionsCommand.reassignPartitions
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val ports = getPorts(2)
+    val servers = TestUtils.createBrokerConfigs(ports, zkPort, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
 
     // wait until reassignment completes
     TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient),
@@ -275,6 +283,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet,
                             "New replicas should exist on brokers")
     servers.foreach(_.shutdown())
+    freePorts(ports)
   }
 
   @Test
@@ -298,7 +307,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val partition = 1
     val preferredReplica = 0
     // create brokers
-    val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_))
+    val ports = getPorts(3)
+    val serverConfigs = TestUtils.createBrokerConfigs(ports, zkPort, false).map(new KafkaConfig(_))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
@@ -310,6 +320,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader)).get
     assertEquals("Preferred replica election failed", preferredReplica, newLeader)
     servers.foreach(_.shutdown())
+    freePorts(ports)
   }
 
   @Test
@@ -318,7 +329,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val topic = "test"
     val partition = 1
     // create brokers
-    val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_))
+    val ports = getPorts(3)
+    val serverConfigs = TestUtils.createBrokerConfigs(ports, zkPort, false).map(new KafkaConfig(_))
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
     // create the topic
     TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
@@ -354,6 +366,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     }
     finally {
       servers.foreach(_.shutdown())
+      freePorts(ports)
     }
   }
 
@@ -365,7 +378,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
   def testTopicConfigChange() {
     val partitions = 3
     val topic = "my-topic"
-    val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0)))
+    val port = getPort()
+    val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, port, zkPort)))
 
     def makeConfig(messageSize: Int, retentionMs: Long) = {
       var props = new Properties()
@@ -398,6 +412,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     } finally {
       server.shutdown()
       server.config.logDirs.map(Utils.rm(_))
+      freePort(port)
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 29cc01b..888f5ef 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -16,10 +16,9 @@
  */
 package kafka.admin
 
-import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import junit.framework.Assert._
-import kafka.utils.{ZkUtils, TestUtils}
+import kafka.utils.{NetworkTestHarness, ZkUtils, TestUtils}
 import kafka.server.{KafkaServer, KafkaConfig}
 import org.junit.Test
 import kafka.common._
@@ -31,7 +30,7 @@ import kafka.producer.KeyedMessage
 import kafka.common.TopicAndPartition
 import kafka.api.PartitionOffsetRequestInfo
 
-class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
+class DeleteTopicTest extends NetworkTestHarness with ZooKeeperTestHarness {
 
   @Test
   def testDeleteTopicWithAllAliveReplicas() {
@@ -42,6 +41,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     AdminUtils.deleteTopic(zkClient, topic)
     verifyTopicDeletion(topic, servers)
     servers.foreach(_.shutdown())
+    freePorts(servers.map(_.socketServer.port).toList)
   }
 
   @Test
@@ -67,6 +67,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     follower.startup()
     verifyTopicDeletion(topic, servers)
     servers.foreach(_.shutdown())
+    freePorts(servers.map(_.socketServer.port).toList)
   }
 
   @Test
@@ -94,6 +95,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     verifyTopicDeletion(topic, servers)
     servers.foreach(_.shutdown())
+    freePorts(servers.map(_.socketServer.port).toList)
   }
 
   @Test
@@ -101,7 +103,8 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topic = "test"
     val topicAndPartition = TopicAndPartition(topic, 0)
-    val brokerConfigs = TestUtils.createBrokerConfigs(4, false)
+    val ports = getPorts(4)
+    val brokerConfigs = TestUtils.createBrokerConfigs(ports, zkPort, false)
     brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
     // create brokers
     val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
@@ -140,6 +143,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     follower.startup()
     verifyTopicDeletion(topic, servers)
     allServers.foreach(_.shutdown())
+    freePorts(ports)
   }
 
   @Test
@@ -163,6 +167,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
       servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty),
       "Replica logs not for new partition [test,1] not deleted after delete topic is complete.")
     servers.foreach(_.shutdown())
+    freePorts(servers.map(_.socketServer.port).toList)
   }
 
   @Test
@@ -180,6 +185,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertTrue("Replica logs not deleted after delete topic is complete",
       servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty))
     servers.foreach(_.shutdown())
+    freePorts(servers.map(_.socketServer.port).toList)
   }
 
   @Test
@@ -200,6 +206,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined),
       "Replicas for topic test not created.")
     servers.foreach(_.shutdown())
+    freePorts(servers.map(_.socketServer.port).toList)
   }
 
   @Test
@@ -221,13 +228,13 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
     assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
     servers.foreach(_.shutdown())
-
+    freePorts(servers.map(_.socketServer.port).toList)
   }
 
   private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topicAndPartition = TopicAndPartition(topic, 0)
-    val brokerConfigs = TestUtils.createBrokerConfigs(3, false)
+    val brokerConfigs = TestUtils.createBrokerConfigs(getPorts(3), zkPort, false)
     brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
     // create brokers
     val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index ac6dd20..1ca42c7 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -18,15 +18,12 @@ package kafka.admin
 
 import junit.framework.Assert._
 import org.junit.Test
-import org.scalatest.junit.JUnit3Suite
-import kafka.utils.Logging
-import kafka.utils.TestUtils
+import kafka.utils.{NetworkTestHarness, Logging, TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.server.KafkaConfig
 import kafka.admin.TopicCommand.TopicCommandOptions
-import kafka.utils.ZkUtils
 
-class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
+class TopicCommandTest extends NetworkTestHarness with ZooKeeperTestHarness with Logging {
 
   @Test
   def testConfigPreservationAcrossPartitionAlteration() {
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 151ba7c..b5d5ea0 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -30,16 +30,15 @@ import kafka.utils._
 import org.junit.Test
 import kafka.serializer._
 import kafka.cluster.{Broker, Cluster}
-import org.scalatest.junit.JUnit3Suite
 import kafka.integration.KafkaServerTestHarness
 
-class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
+class ConsumerIteratorTest extends NetworkTestHarness with KafkaServerTestHarness {
 
   val numNodes = 1
   val configs =
-    for(props <- TestUtils.createBrokerConfigs(numNodes))
+    for(props <- TestUtils.createBrokerConfigs(getPorts(numNodes), zkPort))
     yield new KafkaConfig(props) {
-      override val zkConnect = TestZKUtils.zookeeperConnect
+      override val zkConnect = ConsumerIteratorTest.this.zkConnect
     }
   val messages = new mutable.HashMap[Int, Seq[Message]]
   val topic = "topic"
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index e1d8711..678b3a3 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -21,7 +21,6 @@ import junit.framework.Assert._
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import scala.collection._
-import org.scalatest.junit.JUnit3Suite
 import kafka.message._
 import kafka.serializer._
 import org.I0Itec.zkclient.ZkClient
@@ -32,16 +31,16 @@ import org.apache.log4j.{Logger, Level}
 import kafka.utils.TestUtils._
 import kafka.common.MessageStreamsExistException
 
-class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
+class ZookeeperConsumerConnectorTest extends NetworkTestHarness with KafkaServerTestHarness with Logging {
 
   val RebalanceBackoffMs = 5000
   var dirs : ZKGroupTopicDirs = null
-  val zookeeperConnect = TestZKUtils.zookeeperConnect
+  val zookeeperConnect = ZookeeperConsumerConnectorTest.this.zkConnect
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
   val configs =
-    for(props <- TestUtils.createBrokerConfigs(numNodes))
+    for(props <- TestUtils.createBrokerConfigs(getPorts(numNodes), zkPort))
     yield new KafkaConfig(props) {
       override val zkConnect = zookeeperConnect
       override val numPartitions = numParts
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 95303e0..dc8e632 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -17,21 +17,19 @@
 
 package kafka.integration
 
-import kafka.utils.{ZKGroupTopicDirs, Logging}
+import kafka.utils.{NetworkTestHarness, ZKGroupTopicDirs, Logging, TestUtils}
 import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
 import kafka.server._
-import kafka.utils.TestUtils
 import kafka.serializer._
 import kafka.producer.{Producer, KeyedMessage}
 
 import org.junit.Test
 import org.apache.log4j.{Level, Logger}
-import org.scalatest.junit.JUnit3Suite
 import junit.framework.Assert._
 
-class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
+class AutoOffsetResetTest extends NetworkTestHarness with KafkaServerTestHarness with Logging {
 
-  val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
+  val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, getPort(), zkPort)))
 
   val topic = "test_topic"
   val group = "default_group"
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 25845ab..2b581b7 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -24,18 +24,17 @@ import junit.framework.Assert._
 
 import kafka.cluster._
 import kafka.server._
-import org.scalatest.junit.JUnit3Suite
 import kafka.consumer._
 import kafka.serializer._
 import kafka.producer.{KeyedMessage, Producer}
 import kafka.utils.TestUtils._
-import kafka.utils.TestUtils
+import kafka.utils.{NetworkTestHarness, TestUtils}
 
-class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
+class FetcherTest extends NetworkTestHarness with KafkaServerTestHarness {
 
   val numNodes = 1
   val configs =
-    for(props <- TestUtils.createBrokerConfigs(numNodes))
+    for(props <- TestUtils.createBrokerConfigs(getPorts(numNodes), zkPort))
     yield new KafkaConfig(props)
   val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
   val topic = "topic"
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 3cf7c9b..b1d0d52 100644
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -18,15 +18,14 @@
 package kafka.integration
 
 import kafka.server._
-import kafka.utils.{Utils, TestUtils}
-import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{NetworkTestHarness, Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.common.KafkaException
 
 /**
  * A test harness that brings up some number of broker nodes
  */
-trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
+trait KafkaServerTestHarness extends NetworkTestHarness with ZooKeeperTestHarness {
 
   val configs: List[KafkaConfig]
   var servers: List[KafkaServer] = null
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index a5386a0..91e44d6 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -25,22 +25,21 @@ import kafka.producer.{KeyedMessage, Producer}
 import org.apache.log4j.{Level, Logger}
 import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
-import org.scalatest.junit.JUnit3Suite
 import scala.collection._
 import kafka.admin.AdminUtils
 import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
-import kafka.utils.{StaticPartitioner, TestUtils, Utils}
+import kafka.utils.{NetworkTestHarness, StaticPartitioner, TestUtils, Utils}
 import kafka.serializer.StringEncoder
 import java.util.Properties
 
 /**
  * End to end tests of the primitive apis against a local server
  */
-class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
+class PrimitiveApiTest extends NetworkTestHarness with ProducerConsumerTestHarness with ZooKeeperTestHarness {
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
-  val port = TestUtils.choosePort()
-  val props = TestUtils.createBrokerConfig(0, port)
+  val port = getPort()
+  val props = TestUtils.createBrokerConfig(0, port, zkPort)
   val config = new KafkaConfig(props)
   val configs = List(config)
 
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index 108c2e7..6955042 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -18,12 +18,11 @@
 package kafka.integration
 
 import kafka.consumer.SimpleConsumer
-import org.scalatest.junit.JUnit3Suite
 import kafka.producer.Producer
-import kafka.utils.{StaticPartitioner, TestUtils}
+import kafka.utils.{NetworkTestHarness, StaticPartitioner, TestUtils}
 import kafka.serializer.StringEncoder
 
-trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
+trait ProducerConsumerTestHarness extends NetworkTestHarness with KafkaServerTestHarness {
     val port: Int
     val host = "localhost"
     var producer: Producer[String, String] = null
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index eab4b5f..39d4f5b 100644
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -17,29 +17,28 @@
 
 package kafka.integration
 
-import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.{NetworkTestHarness, Utils, TestUtils}
 import kafka.server.{KafkaConfig, KafkaServer}
 
-class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
+class RollingBounceTest extends NetworkTestHarness with ZooKeeperTestHarness {
   val brokerId1 = 0
   val brokerId2 = 1
   val brokerId3 = 2
   val brokerId4 = 3
 
-  val port1 = TestUtils.choosePort()
-  val port2 = TestUtils.choosePort()
-  val port3 = TestUtils.choosePort()
-  val port4 = TestUtils.choosePort()
+  val port1 = getPort()
+  val port2 = getPort()
+  val port3 = getPort()
+  val port4 = getPort()
 
   // controlled.shutdown.enable is true by default
-  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
-  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
+  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, zkPort)
+  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, zkPort)
+  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, zkPort)
+  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, zkPort)
   configProps4.put("controlled.shutdown.retry.backoff.ms", "100")
 
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 35dc071..6ddda27 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -17,21 +17,20 @@
 
 package kafka.integration
 
-import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.AdminUtils
 import java.nio.ByteBuffer
 import junit.framework.Assert._
 import kafka.cluster.Broker
-import kafka.utils.TestUtils
+import kafka.utils.{NetworkTestHarness, TestUtils}
 import kafka.utils.TestUtils._
 import kafka.server.{KafkaServer, KafkaConfig}
 import kafka.api.TopicMetadataRequest
 import kafka.common.ErrorMapping
 import kafka.client.ClientUtils
 
-class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
-  val props = createBrokerConfigs(1)
+class TopicMetadataTest extends NetworkTestHarness with ZooKeeperTestHarness {
+  val props = createBrokerConfigs(getPorts(1), zkPort)
   val configs = props.map(p => new KafkaConfig(p))
   private var server1: KafkaServer = null
   val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port))
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index f44568c..dd1c053 100644
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -20,7 +20,6 @@ package kafka.integration
 import scala.collection.mutable.MutableList
 import scala.util.Random
 import org.apache.log4j.{Level, Logger}
-import org.scalatest.junit.JUnit3Suite
 import java.util.Properties
 import junit.framework.Assert._
 import kafka.admin.AdminUtils
@@ -29,22 +28,22 @@ import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException}
 import kafka.producer.{KeyedMessage, Producer}
 import kafka.serializer.{DefaultEncoder, StringEncoder}
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.Utils
+import kafka.utils.{NetworkTestHarness, Utils}
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 
-class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
+class UncleanLeaderElectionTest extends NetworkTestHarness with ZooKeeperTestHarness {
   val brokerId1 = 0
   val brokerId2 = 1
 
-  val port1 = choosePort()
-  val port2 = choosePort()
+  val port1 = getPort()
+  val port2 = getPort()
 
   // controlled shutdown is needed for these tests, but we can trim the retry count and backoff interval to
   // reduce test execution time
   val enableControlledShutdown = true
-  val configProps1 = createBrokerConfig(brokerId1, port1)
-  val configProps2 = createBrokerConfig(brokerId2, port2)
+  val configProps1 = createBrokerConfig(brokerId1, port1, zkPort)
+  val configProps2 = createBrokerConfig(brokerId2, port2, zkPort)
 
   for (configProps <- List(configProps1, configProps2)) {
     configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown))
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index d6248b0..146ce88 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -23,27 +23,25 @@ import kafka.serializer._
 import kafka.integration.KafkaServerTestHarness
 import kafka.producer.KeyedMessage
 import kafka.javaapi.producer.Producer
-import kafka.utils.IntEncoder
-import kafka.utils.{Logging, TestUtils}
+import kafka.utils.{NetworkTestHarness, IntEncoder, Logging, TestUtils}
 import kafka.consumer.{KafkaStream, ConsumerConfig}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.common.MessageStreamsExistException
 
 import scala.collection.JavaConversions
 
-import org.scalatest.junit.JUnit3Suite
 import org.apache.log4j.{Level, Logger}
 import junit.framework.Assert._
 
 
-class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
+class ZookeeperConsumerConnectorTest extends NetworkTestHarness with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
 
   val zookeeperConnect = zkConnect
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
   val configs =
-    for(props <- TestUtils.createBrokerConfigs(numNodes))
+    for(props <- TestUtils.createBrokerConfigs(getPorts(numNodes), zkPort))
     yield new KafkaConfig(props) {
       override val numPartitions = numParts
       override val zkConnect = zookeeperConnect
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index d670ba7..3f71cc9 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -37,7 +37,8 @@ class LogTest extends JUnitSuite {
   @Before
   def setUp() {
     logDir = TestUtils.tempDir()
-    val props = TestUtils.createBrokerConfig(0, -1)
+    val zkPort = 2181
+    val props = TestUtils.createBrokerConfig(0, -1, zkPort)
     config = new KafkaConfig(props)
   }
 
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index 4ea0489..55e414c 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -19,7 +19,7 @@ package kafka.log4j
 
 import kafka.consumer.SimpleConsumer
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, Utils, Logging}
+import kafka.utils.{NetworkTestHarness, TestUtils, Utils, Logging}
 import kafka.api.FetchRequestBuilder
 import kafka.producer.async.MissingConfigException
 import kafka.serializer.Encoder
@@ -31,11 +31,10 @@ import java.io.File
 import org.apache.log4j.spi.LoggingEvent
 import org.apache.log4j.{PropertyConfigurator, Logger}
 import org.junit.{After, Before, Test}
-import org.scalatest.junit.JUnit3Suite
 
 import junit.framework.Assert._
 
-class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
+class KafkaLog4jAppenderTest extends NetworkTestHarness with ZooKeeperTestHarness with Logging {
 
   var logDirZk: File = null
   var config: KafkaConfig = null
@@ -47,14 +46,14 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
 
   private val brokerZk = 0
 
-  private val ports = TestUtils.choosePorts(2)
+  private val ports = getPorts(2)
   private val portZk = ports(0)
 
   @Before
   override def setUp() {
     super.setUp()
 
-    val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk)
+    val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk, zkPort)
     val logDirZkPath = propsZk.getProperty("log.dir")
     logDirZk = new File(logDirZkPath)
     config = new KafkaConfig(propsZk)
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 5f4d852..b3d4c8d 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -35,7 +35,7 @@ class SocketServerTest extends JUnitSuite {
 
   val server: SocketServer = new SocketServer(0,
                                               host = null,
-                                              port = kafka.utils.TestUtils.choosePort,
+                                              port = kafka.utils.TestUtils.choosePorts(1).head,
                                               numProcessorThreads = 1,
                                               maxQueuedRequests = 50,
                                               sendBufferSize = 300000,
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 1db6ac3..1396f94 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -36,7 +36,8 @@ import scala.collection.mutable.ArrayBuffer
 import kafka.utils._
 
 class AsyncProducerTest extends JUnit3Suite {
-  val props = createBrokerConfigs(1)
+  val zkPort = 2181
+  val props = createBrokerConfigs(TestUtils.choosePorts(1), zkPort)
   val configs = props.map(p => new KafkaConfig(p))
 
   override def setUp() {
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index ce65dab..895dee1 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -19,7 +19,6 @@ package kafka.producer
 
 import org.apache.kafka.common.config.ConfigException
 import org.scalatest.TestFailedException
-import org.scalatest.junit.JUnit3Suite
 import kafka.consumer.SimpleConsumer
 import kafka.message.Message
 import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
@@ -37,10 +36,10 @@ import org.junit.Assert.assertEquals
 import kafka.common.{ErrorMapping, FailedToSendMessageException}
 import kafka.serializer.StringEncoder
 
-class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
+class ProducerTest extends NetworkTestHarness with ZooKeeperTestHarness with Logging{
   private val brokerId1 = 0
   private val brokerId2 = 1
-  private val ports = TestUtils.choosePorts(2)
+  private val ports = getPorts(2)
   private val (port1, port2) = (ports(0), ports(1))
   private var server1: KafkaServer = null
   private var server2: KafkaServer = null
@@ -49,10 +48,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
   private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
   private var servers = List.empty[KafkaServer]
 
-  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
+  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, zkPort, false)
   props1.put("num.partitions", "4")
   private val config1 = new KafkaConfig(props1)
-  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
+  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, zkPort, false)
   props2.put("num.partitions", "4")
   private val config2 = new KafkaConfig(props2)
 
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index fb61d55..e8eb082 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -26,15 +26,14 @@ import kafka.message._
 import kafka.server.KafkaConfig
 import kafka.utils._
 import org.junit.Test
-import org.scalatest.junit.JUnit3Suite
 import kafka.api.ProducerResponseStatus
 import kafka.common.{TopicAndPartition, ErrorMapping}
 
-class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
+class SyncProducerTest extends NetworkTestHarness with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
   // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
-  val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1, false).head))
-  val zookeeperConnect = TestZKUtils.zookeeperConnect
+  val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(getPorts(1), zkPort, false).head))
+  val zookeeperConnect = SyncProducerTest.this.zkConnect
 
   @Test
   def testReachableServer() {
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index f0c4a56..6883dd3 100644
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -17,23 +17,22 @@
 
 package kafka.server
 
-import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import junit.framework.Assert._
-import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.utils.{NetworkTestHarness, ZkUtils, Utils, TestUtils}
 
-class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
-  var server : KafkaServer = null
+class AdvertiseBrokerTest extends NetworkTestHarness with ZooKeeperTestHarness {
   val brokerId = 0
   val advertisedHostName = "routable-host"
   val advertisedPort = 1234
 
+  val props = TestUtils.createBrokerConfig(brokerId, getPort(), zkPort)
+  props.put("advertised.host.name", advertisedHostName)
+  props.put("advertised.port", advertisedPort.toString)
+  var server: KafkaServer = null
+
   override def setUp() {
     super.setUp()
-    val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
-    props.put("advertised.host.name", advertisedHostName)
-    props.put("advertised.port", advertisedPort.toString)
-    
     server = TestUtils.createServer(new KafkaConfig(props))
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index ad12116..cad0fb2 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -23,11 +23,10 @@ import kafka.utils._
 import kafka.common._
 import kafka.log.LogConfig
 import kafka.admin.{AdminOperationException, AdminUtils}
-import org.scalatest.junit.JUnit3Suite
 
-class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
+class DynamicConfigChangeTest extends NetworkTestHarness with KafkaServerTestHarness {
   
-  override val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.choosePort)))
+  override val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, getPort(), zkPort)))
 
   @Test
   def testConfigChange() {
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 03a424d..c316fae 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -30,7 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 class HighwatermarkPersistenceTest extends JUnit3Suite {
 
-  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_))
+  val zkPort = 2181
+  val configs = TestUtils.createBrokerConfigs(TestUtils.choosePorts(2), zkPort).map(new KafkaConfig(_))
   val topic = "foo"
   val logManagers = configs map { config =>
     TestUtils.createLogManager(
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index cd302aa..f58e79d 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -29,7 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean
 class IsrExpirationTest extends JUnit3Suite {
 
   var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
-  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+  val zkPort = 2181
+  val configs = TestUtils.createBrokerConfigs(TestUtils.choosePorts(2), zkPort).map(new KafkaConfig(_) {
     override val replicaLagTimeMaxMs = 100L
     override val replicaFetchWaitMaxMs = 100
     override val replicaLagMaxMessages = 10L
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2377abe..6b161b8 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -23,10 +23,11 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.utils.TestUtils
 
 class KafkaConfigTest extends JUnit3Suite {
+  val zkPort = 2181
 
   @Test
   def testLogRetentionTimeHoursProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
     props.put("log.retention.hours", "1")
 
     val cfg = new KafkaConfig(props)
@@ -36,7 +37,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRetentionTimeMinutesProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
     props.put("log.retention.minutes", "30")
 
     val cfg = new KafkaConfig(props)
@@ -46,7 +47,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRetentionTimeMsProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
     props.put("log.retention.ms", "1800000")
 
     val cfg = new KafkaConfig(props)
@@ -56,7 +57,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRetentionTimeNoConfigProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
 
     val cfg = new KafkaConfig(props)
     assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -65,7 +66,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRetentionTimeBothMinutesAndHoursProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
     props.put("log.retention.minutes", "30")
     props.put("log.retention.hours", "1")
 
@@ -76,7 +77,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRetentionTimeBothMinutesAndMsProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
     props.put("log.retention.ms", "1800000")
     props.put("log.retention.minutes", "10")
 
@@ -90,7 +91,7 @@ class KafkaConfigTest extends JUnit3Suite {
     val port = 9999
     val hostName = "fake-host"
     
-    val props = TestUtils.createBrokerConfig(0, port)
+    val props = TestUtils.createBrokerConfig(0, port, zkPort)
     props.put("host.name", hostName)
     
     val serverConfig = new KafkaConfig(props)
@@ -105,7 +106,7 @@ class KafkaConfigTest extends JUnit3Suite {
     val advertisedHostName = "routable-host"
     val advertisedPort = 1234
     
-    val props = TestUtils.createBrokerConfig(0, port)
+    val props = TestUtils.createBrokerConfig(0, port, zkPort)
     props.put("advertised.host.name", advertisedHostName)
     props.put("advertised.port", advertisedPort.toString)
     
@@ -117,7 +118,7 @@ class KafkaConfigTest extends JUnit3Suite {
 
   @Test
   def testUncleanLeaderElectionDefault() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
     val serverConfig = new KafkaConfig(props)
 
     assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
@@ -125,7 +126,7 @@ class KafkaConfigTest extends JUnit3Suite {
 
   @Test
   def testUncleanElectionDisabled() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
     props.put("unclean.leader.election.enable", String.valueOf(false))
     val serverConfig = new KafkaConfig(props)
 
@@ -134,7 +135,7 @@ class KafkaConfigTest extends JUnit3Suite {
 
   @Test
   def testUncleanElectionEnabled() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
     props.put("unclean.leader.election.enable", String.valueOf(true))
     val serverConfig = new KafkaConfig(props)
 
@@ -143,7 +144,7 @@ class KafkaConfigTest extends JUnit3Suite {
 
   @Test
   def testUncleanElectionInvalid() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
     props.put("unclean.leader.election.enable", "invalid")
 
     intercept[IllegalArgumentException] {
@@ -153,7 +154,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRollTimeMsProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
     props.put("log.roll.ms", "1800000")
 
     val cfg = new KafkaConfig(props)
@@ -163,7 +164,7 @@ class KafkaConfigTest extends JUnit3Suite {
   
   @Test
   def testLogRollTimeBothMsAndHoursProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
     props.put("log.roll.ms", "1800000")
     props.put("log.roll.hours", "1")
 
@@ -174,7 +175,7 @@ class KafkaConfigTest extends JUnit3Suite {
     
   @Test
   def testLogRollTimeNoConfigProvided() {
-    val props = TestUtils.createBrokerConfig(0, 8181)
+    val props = TestUtils.createBrokerConfig(0, 8181, zkPort)
 
     val cfg = new KafkaConfig(props)
     assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis																									)
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index c2ba07c..e4d0e8e 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -17,25 +17,25 @@
 
 package kafka.server
 
-import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
-import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.utils.{NetworkTestHarness, ZkUtils, Utils, TestUtils}
 import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch, ControllerChannelManager}
 import kafka.cluster.Broker
 import kafka.common.ErrorMapping
 import kafka.api._
 
-class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
+class LeaderElectionTest extends NetworkTestHarness with ZooKeeperTestHarness {
   val brokerId1 = 0
   val brokerId2 = 1
 
-  val port1 = TestUtils.choosePort()
-  val port2 = TestUtils.choosePort()
+  val port1 = getPort()
+  val port2 = getPort()
+  val extraControllerPort = getPort()
 
-  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
-  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
+  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, zkPort, false)
+  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, zkPort, false)
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
   var staleControllerEpochDetected = false
@@ -117,7 +117,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // start another controller
     val controllerId = 2
-    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
+    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, extraControllerPort, zkPort))
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
     val controllerContext = new ControllerContext(zkClient, 6000)
     controllerContext.liveBrokers = brokers.toSet
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index c06ee75..af7d1e9 100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -24,7 +24,6 @@ import java.util.{Random, Properties}
 import kafka.consumer.SimpleConsumer
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.zk.ZooKeeperTestHarness
-import org.scalatest.junit.JUnit3Suite
 import kafka.admin.AdminUtils
 import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 import kafka.utils.TestUtils._
@@ -33,7 +32,7 @@ import org.junit.After
 import org.junit.Before
 import org.junit.Test
 
-class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
+class LogOffsetTest extends NetworkTestHarness with ZooKeeperTestHarness {
   val random = new Random() 
   var logDir: File = null
   var topicLogDir: File = null
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index d5d351c..7a2042e 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -17,8 +17,7 @@
 package kafka.server
 
 import kafka.utils.TestUtils._
-import kafka.utils.IntEncoder
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.{NetworkTestHarness, IntEncoder, Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.common._
 import kafka.producer.{KeyedMessage, Producer}
@@ -26,12 +25,11 @@ import kafka.serializer.StringEncoder
 
 import java.io.File
 
-import org.scalatest.junit.JUnit3Suite
 import org.junit.Assert._
 
-class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
+class LogRecoveryTest extends NetworkTestHarness with ZooKeeperTestHarness {
 
-  val configs = TestUtils.createBrokerConfigs(2, false).map(new KafkaConfig(_) {
+  val configs = TestUtils.createBrokerConfigs(getPorts(2), zkPort, false).map(new KafkaConfig(_) {
     override val replicaLagTimeMaxMs = 5000L
     override val replicaLagMaxMessages = 10L
     override val replicaFetchWaitMaxMs = 1000
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 8c5364f..1735de8 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -24,14 +24,13 @@ import java.util.Properties
 import kafka.consumer.SimpleConsumer
 import org.junit.{After, Before, Test}
 import kafka.zk.ZooKeeperTestHarness
-import org.scalatest.junit.JUnit3Suite
 import kafka.api.{ConsumerMetadataRequest, OffsetCommitRequest, OffsetFetchRequest}
 import kafka.utils.TestUtils._
 import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition}
 import scala.util.Random
 import scala.collection._
 
-class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
+class OffsetCommitTest extends NetworkTestHarness with ZooKeeperTestHarness {
   val random: Random = new Random()
   var logDir: File = null
   var topicLogDir: File = null
@@ -45,7 +44,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    val config: Properties = createBrokerConfig(1, brokerPort)
+    val config: Properties = createBrokerConfig(1, brokerPort, zkPort)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
     time = new MockTime()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index da4bafc..af98ce8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -17,17 +17,16 @@
 
 package kafka.server
 
-import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
 import kafka.producer.KeyedMessage
 import kafka.serializer.StringEncoder
-import kafka.utils.TestUtils
+import kafka.utils.{NetworkTestHarness, TestUtils}
 import junit.framework.Assert._
 import kafka.common._
 
-class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
-  val props = createBrokerConfigs(2,false)
+class ReplicaFetchTest extends NetworkTestHarness with ZooKeeperTestHarness  {
+  val props = createBrokerConfigs(getPorts(2), zkPort, false)
   val configs = props.map(p => new KafkaConfig(p))
   var brokers: Seq[KafkaServer] = null
   val topic1 = "foo"
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a9c4ddc..3f0dd05 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -30,10 +30,12 @@ import org.junit.Test
 class ReplicaManagerTest extends JUnit3Suite {
 
   val topic = "test-topic"
+  val port = TestUtils.choosePorts(1).head
+  val zkPort = 2181
 
   @Test
   def testHighWaterMarkDirectoryMapping() {
-    val props = TestUtils.createBrokerConfig(1)
+    val props = TestUtils.createBrokerConfig(1, port, zkPort)
     val config = new KafkaConfig(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
@@ -46,7 +48,7 @@ class ReplicaManagerTest extends JUnit3Suite {
 
   @Test
   def testHighwaterMarkRelativeDirectoryMapping() {
-    val props = TestUtils.createBrokerConfig(1)
+    val props = TestUtils.createBrokerConfig(1, port, zkPort)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = new KafkaConfig(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 3804a11..5e5e7ea 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 import kafka.zk.ZooKeeperTestHarness
 import kafka.consumer.SimpleConsumer
 import kafka.producer._
-import kafka.utils.{IntEncoder, TestUtils, Utils}
+import kafka.utils.{NetworkTestHarness, IntEncoder, TestUtils, Utils}
 import kafka.utils.TestUtils._
 import kafka.api.FetchRequestBuilder
 import kafka.message.ByteBufferMessageSet
@@ -28,12 +28,11 @@ import kafka.serializer.StringEncoder
 import java.io.File
 
 import org.junit.Test
-import org.scalatest.junit.JUnit3Suite
 import junit.framework.Assert._
 
-class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
-  val port = TestUtils.choosePort
-  val props = TestUtils.createBrokerConfig(0, port)
+class ServerShutdownTest extends NetworkTestHarness with ZooKeeperTestHarness {
+  val port = getPort()
+  val props = TestUtils.createBrokerConfig(0, port, zkPort)
   val config = new KafkaConfig(props)
 
   val host = "localhost"
@@ -103,7 +102,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   @Test
   def testCleanShutdownWithDeleteTopicEnabled() {
-    val newProps = TestUtils.createBrokerConfig(0, port)
+    val newProps = TestUtils.createBrokerConfig(0, port, zkPort)
     newProps.setProperty("delete.topic.enable", "true")
     val newConfig = new KafkaConfig(newProps)
     var server = new KafkaServer(newConfig)
@@ -116,7 +115,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   @Test
   def testCleanShutdownAfterFailedStartup() {
-    val newProps = TestUtils.createBrokerConfig(0, port)
+    val newProps = TestUtils.createBrokerConfig(0, port, zkPort)
     newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
     val newConfig = new KafkaConfig(newProps)
     var server = new KafkaServer(newConfig)
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index a0ed485..b0eca12 100644
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -17,26 +17,23 @@
 
 package kafka.server
 
-import org.scalatest.junit.JUnit3Suite
 import kafka.zk
-import kafka.utils.ZkUtils
-import kafka.utils.Utils
-import kafka.utils.TestUtils
+import kafka.utils.{NetworkTestHarness, ZkUtils, Utils, TestUtils}
 
 import kafka.zk.ZooKeeperTestHarness
 import junit.framework.Assert._
 
-class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
-  var server : KafkaServer = null
+class ServerStartupTest extends NetworkTestHarness with ZooKeeperTestHarness {
   val brokerId = 0
   val zookeeperChroot = "/kafka-chroot-for-unittest"
 
+  val props = TestUtils.createBrokerConfig(brokerId, getPort(), zkPort)
+  val zooKeeperConnect = props.get("zookeeper.connect")
+  props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
+  var server: KafkaServer = null
+
   override def setUp() {
     super.setUp()
-    val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
-    val zooKeeperConnect = props.get("zookeeper.connect")
-    props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
-
     server = TestUtils.createServer(new KafkaConfig(props))
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 09ed8f5..ac3f549 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -32,7 +32,8 @@ import org.scalatest.junit.JUnit3Suite
 
 class SimpleFetchTest extends JUnit3Suite {
 
-  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+  val zkPort = 2181
+  val configs = TestUtils.createBrokerConfigs(TestUtils.choosePorts(2), zkPort).map(new KafkaConfig(_) {
     override val replicaLagTimeMaxMs = 100L
     override val replicaFetchWaitMaxMs = 100
     override val replicaLagMaxMessages = 10L
diff --git a/core/src/test/scala/unit/kafka/utils/NetworkTestHarness.scala b/core/src/test/scala/unit/kafka/utils/NetworkTestHarness.scala
new file mode 100644
index 0000000..92a7509
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/NetworkTestHarness.scala
@@ -0,0 +1,29 @@
+package kafka.utils
+
+import org.scalatest.junit.JUnit3Suite
+
+abstract class NetworkTestHarness(val numTestPorts: Int = 5) extends JUnit3Suite {
+  // This *must* be lazy to allow overriding by subclasses
+  lazy private val preallocatedPorts: List[Int] = TestUtils.choosePorts(numTestPorts)
+  lazy val availablePorts = collection.mutable.Queue(preallocatedPorts: _*)
+
+  override def tearDown() {
+    super.tearDown()
+  }
+
+  def getPort(): Int = {
+    availablePorts.dequeue()
+  }
+
+  def getPorts(n: Int): List[Int] = {
+    (0 until n).map(_ => getPort()).toList
+  }
+
+  def freePort(port: Int) {
+    availablePorts.enqueue(port)
+  }
+
+  def freePorts(ports: List[Int]) {
+    availablePorts.enqueue(ports: _*)
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 84e0855..533faa5 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -21,13 +21,12 @@ import kafka.server.{ReplicaFetcherManager, KafkaConfig}
 import kafka.api.LeaderAndIsr
 import kafka.zk.ZooKeeperTestHarness
 import kafka.common.TopicAndPartition
-import org.scalatest.junit.JUnit3Suite
 import org.junit.Assert._
 import org.junit.Test
 import org.easymock.EasyMock
 
 
-class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
+class ReplicationUtilsTest extends NetworkTestHarness with ZooKeeperTestHarness {
   val topic = "my-topic-test"
   val partitionId = 0
   val brokerId = 1
@@ -42,6 +41,7 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
   val topicDataMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1,
     "versions" -> 2, "leader_epoch" -> 2,"isr" -> List(1,2)))
 
+  val configs = TestUtils.createBrokerConfigs(getPorts(1), zkPort).map(new KafkaConfig(_))
 
   override def setUp() {
     super.setUp()
@@ -50,7 +50,6 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   @Test
   def testUpdateLeaderAndIsr() {
-    val configs = TestUtils.createBrokerConfigs(1).map(new KafkaConfig(_))
     val log = EasyMock.createMock(classOf[kafka.log.Log])
     EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
     EasyMock.expect(log)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index dd3640f..86ec792 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -63,7 +63,9 @@ object TestUtils extends Logging {
   val random = new Random()
 
   /**
-   * Choose a number of random available ports
+   * Choose a number of random available ports. Note that you *must* allocate all your ports for a single test with a
+   * single call to choosePorts OR ensure that any ports you have already allocated are actively in use so choosePorts()
+   * will not reuse them.
    */
   def choosePorts(count: Int): List[Int] = {
     val sockets =
@@ -76,11 +78,6 @@ object TestUtils extends Logging {
   }
 
   /**
-   * Choose an available port
-   */
-  def choosePort(): Int = choosePorts(1).head
-
-  /**
    * Create a temporary directory
    */
   def tempDir(): File = {
@@ -137,10 +134,10 @@ object TestUtils extends Logging {
   /**
    * Create a test config for the given node id
    */
-  def createBrokerConfigs(numConfigs: Int,
+  def createBrokerConfigs(ports: List[Int], zkPort: Int,
     enableControlledShutdown: Boolean = true): List[Properties] = {
-    for((port, node) <- choosePorts(numConfigs).zipWithIndex)
-    yield createBrokerConfig(node, port, enableControlledShutdown)
+    for((port, node) <- ports.zipWithIndex)
+    yield createBrokerConfig(node, port, zkPort, enableControlledShutdown)
   }
 
   def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
@@ -150,14 +147,14 @@ object TestUtils extends Logging {
   /**
    * Create a test config for the given node id
    */
-  def createBrokerConfig(nodeId: Int, port: Int = choosePort(),
+  def createBrokerConfig(nodeId: Int, port: Int, zkPort: Int,
     enableControlledShutdown: Boolean = true): Properties = {
     val props = new Properties
     props.put("broker.id", nodeId.toString)
     props.put("host.name", "localhost")
     props.put("port", port.toString)
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
-    props.put("zookeeper.connect", TestZKUtils.zookeeperConnect)
+    props.put("zookeeper.connect", TestZKUtils.zookeeperConnect(zkPort))
     props.put("replica.socket.timeout.ms", "1500")
     props.put("controlled.shutdown.enable", enableControlledShutdown.toString)
     props
@@ -723,7 +720,7 @@ object TestUtils extends Logging {
 }
 
 object TestZKUtils {
-  val zookeeperConnect = "127.0.0.1:" + TestUtils.choosePort()
+  def zookeeperConnect(port: Int) = "127.0.0.1:" + port
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 85eec6f..baec705 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -19,12 +19,10 @@ package kafka.zk
 
 import kafka.consumer.ConsumerConfig
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZkUtils, ZKStringSerializer}
-import kafka.utils.TestUtils
+import kafka.utils.{NetworkTestHarness, ZkUtils, ZKStringSerializer, TestUtils}
 import org.junit.Assert
-import org.scalatest.junit.JUnit3Suite
 
-class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
+class ZKEphemeralTest extends NetworkTestHarness with ZooKeeperTestHarness {
   var zkSessionTimeoutMs = 1000
 
   def testEphemeralNodeCleanup = {
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 67d9c4b..9269bcb 100644
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -17,12 +17,12 @@
 
 package kafka.zk
 
-import org.scalatest.junit.JUnit3Suite
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils}
+import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils, NetworkTestHarness}
 
-trait ZooKeeperTestHarness extends JUnit3Suite {
-  val zkConnect: String = TestZKUtils.zookeeperConnect
+trait ZooKeeperTestHarness extends NetworkTestHarness {
+  val zkPort: Int = getPort()
+  val zkConnect: String = TestZKUtils.zookeeperConnect(zkPort)
   var zookeeper: EmbeddedZookeeper = null
   var zkClient: ZkClient = null
   val zkConnectionTimeout = 6000
-- 
2.1.2

