From 6e7f1bd611c82115788bed75571b226cf558d155 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Wed, 29 Oct 2014 13:48:18 -0700 Subject: [PATCH] KAFKA-1501 Ensure tests allocate all ports simultaneously to ensure ports won't be reused. --- .../kafka/api/ProducerCompressionTest.scala | 9 +++--- .../kafka/api/ProducerFailureHandlingTest.scala | 8 ++--- .../integration/kafka/api/ProducerSendTest.scala | 9 +++--- .../scala/unit/kafka/admin/AddPartitionsTest.scala | 21 +++++++------ .../test/scala/unit/kafka/admin/AdminTest.scala | 35 +++++++++++++++------- .../scala/unit/kafka/admin/DeleteTopicTest.scala | 19 ++++++++---- .../scala/unit/kafka/admin/TopicCommandTest.scala | 7 ++--- .../unit/kafka/consumer/ConsumerIteratorTest.scala | 7 ++--- .../consumer/ZookeeperConsumerConnectorTest.scala | 7 ++--- .../kafka/integration/AutoOffsetResetTest.scala | 8 ++--- .../scala/unit/kafka/integration/FetcherTest.scala | 7 ++--- .../kafka/integration/KafkaServerTestHarness.scala | 5 ++-- .../unit/kafka/integration/PrimitiveApiTest.scala | 9 +++--- .../integration/ProducerConsumerTestHarness.scala | 5 ++-- .../unit/kafka/integration/RollingBounceTest.scala | 21 +++++++------ .../unit/kafka/integration/TopicMetadataTest.scala | 7 ++--- .../integration/UncleanLeaderElectionTest.scala | 13 ++++---- .../consumer/ZookeeperConsumerConnectorTest.scala | 8 ++--- core/src/test/scala/unit/kafka/log/LogTest.scala | 3 +- .../unit/kafka/log4j/KafkaLog4jAppenderTest.scala | 9 +++--- .../scala/unit/kafka/metrics/MetricsTest.scala | 8 ++--- .../unit/kafka/network/SocketServerTest.scala | 2 +- .../unit/kafka/producer/AsyncProducerTest.scala | 3 +- .../scala/unit/kafka/producer/ProducerTest.scala | 9 +++--- .../unit/kafka/producer/SyncProducerTest.scala | 7 ++--- .../unit/kafka/server/AdvertiseBrokerTest.scala | 15 +++++----- .../kafka/server/DynamicConfigChangeTest.scala | 5 ++-- .../server/HighwatermarkPersistenceTest.scala | 3 +- .../unit/kafka/server/ISRExpirationTest.scala | 3 +- .../scala/unit/kafka/server/KafkaConfigTest.scala | 31 +++++++++---------- .../unit/kafka/server/LeaderElectionTest.scala | 16 +++++----- .../scala/unit/kafka/server/LogOffsetTest.scala | 3 +- .../scala/unit/kafka/server/LogRecoveryTest.scala | 8 ++--- .../scala/unit/kafka/server/OffsetCommitTest.scala | 5 ++-- .../scala/unit/kafka/server/ReplicaFetchTest.scala | 7 ++--- .../unit/kafka/server/ReplicaManagerTest.scala | 6 ++-- .../unit/kafka/server/ServerShutdownTest.scala | 13 ++++---- .../unit/kafka/server/ServerStartupTest.scala | 16 +++++----- .../scala/unit/kafka/server/SimpleFetchTest.scala | 3 +- .../unit/kafka/utils/NetworkTestHarness.scala | 29 ++++++++++++++++++ .../unit/kafka/utils/ReplicationUtilsTest.scala | 5 ++-- .../test/scala/unit/kafka/utils/TestUtils.scala | 21 ++++++------- .../test/scala/unit/kafka/zk/ZKEphemeralTest.scala | 6 ++-- .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 8 ++--- 44 files changed, 234 insertions(+), 215 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/utils/NetworkTestHarness.scala diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 6379f2b..dd9750e 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -19,7 +19,6 @@ package kafka.api.test import java.util.{Properties, Collection, ArrayList} -import org.scalatest.junit.JUnit3Suite import org.junit.runners.Parameterized import org.junit.runner.RunWith import org.junit.runners.Parameterized.Parameters @@ -32,18 +31,18 @@ import kafka.server.{KafkaConfig, KafkaServer} import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.zk.ZooKeeperTestHarness -import kafka.utils.{Utils, TestUtils} +import kafka.utils.{NetworkTestHarness, Utils, TestUtils} import scala.Array @RunWith(value = classOf[Parameterized]) -class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooKeeperTestHarness { +class ProducerCompressionTest(compression: String) extends NetworkTestHarness with ZooKeeperTestHarness { private val brokerId = 0 - private val port = TestUtils.choosePort + private val port = getPort() private var server: KafkaServer = null - private val props = TestUtils.createBrokerConfig(brokerId, port) + private val props = TestUtils.createBrokerConfig(brokerId, port, zkPort) private val config = new KafkaConfig(props) private val topic = "topic" diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index a913fe5..cfa719b 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -29,21 +29,21 @@ import kafka.common.Topic import kafka.consumer.SimpleConsumer import kafka.server.KafkaConfig import kafka.integration.KafkaServerTestHarness -import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils} +import kafka.utils.{NetworkTestHarness, TestZKUtils, ShutdownableThread, TestUtils} import org.apache.kafka.common.KafkaException import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasException} import org.apache.kafka.clients.producer._ -class ProducerFailureHandlingTest extends KafkaServerTestHarness { +class ProducerFailureHandlingTest extends NetworkTestHarness with KafkaServerTestHarness { private val producerBufferSize = 30000 private val serverMessageMaxBytes = producerBufferSize/2 val numServers = 2 val configs = - for(props <- TestUtils.createBrokerConfigs(numServers, false)) + for(props <- TestUtils.createBrokerConfigs(getPorts(numServers), zkPort, false)) yield new KafkaConfig(props) { - override val zkConnect = TestZKUtils.zookeeperConnect + override val zkConnect = ProducerFailureHandlingTest.this.zkConnect override val autoCreateTopicsEnable = false override val messageMaxBytes = serverMessageMaxBytes } diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index d407af9..ea8743a 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -20,24 +20,23 @@ package kafka.api.test import java.lang.{Integer, IllegalArgumentException} import org.apache.kafka.clients.producer._ -import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ import kafka.server.KafkaConfig -import kafka.utils.{TestZKUtils, TestUtils} +import kafka.utils.{NetworkTestHarness, TestZKUtils, TestUtils} import kafka.consumer.SimpleConsumer import kafka.api.FetchRequestBuilder import kafka.message.Message import kafka.integration.KafkaServerTestHarness -class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { +class ProducerSendTest extends NetworkTestHarness with KafkaServerTestHarness { val numServers = 2 val configs = - for(props <- TestUtils.createBrokerConfigs(numServers, false)) + for(props <- TestUtils.createBrokerConfigs(getPorts(numServers), zkPort, false)) yield new KafkaConfig(props) { - override val zkConnect = TestZKUtils.zookeeperConnect + override val zkConnect = ProducerSendTest.this.zkConnect override val numPartitions = 4 } diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 1bf2667..959bb3a 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -17,30 +17,29 @@ package kafka.admin -import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import junit.framework.Assert._ -import kafka.utils.{ZkUtils, Utils, TestUtils} +import kafka.utils.{NetworkTestHarness, ZkUtils, Utils, TestUtils} import kafka.cluster.Broker import kafka.client.ClientUtils import kafka.server.{KafkaConfig, KafkaServer} -class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { +class AddPartitionsTest extends NetworkTestHarness with ZooKeeperTestHarness { val brokerId1 = 0 val brokerId2 = 1 val brokerId3 = 2 val brokerId4 = 3 - val port1 = TestUtils.choosePort() - val port2 = TestUtils.choosePort() - val port3 = TestUtils.choosePort() - val port4 = TestUtils.choosePort() + val port1 = getPort() + val port2 = getPort() + val port3 = getPort() + val port4 = getPort() - val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false) - val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false) - val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, false) - val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, false) + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, zkPort, false) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, zkPort, false) + val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, zkPort, false) + val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, zkPort, false) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] var brokers: Seq[Broker] = Seq.empty[Broker] diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index e289798..1c494cb 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -18,7 +18,6 @@ package kafka.admin import junit.framework.Assert._ import org.junit.Test -import org.scalatest.junit.JUnit3Suite import java.util.Properties import kafka.utils._ import kafka.log._ @@ -30,7 +29,7 @@ import java.io.File import TestUtils._ -class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { +class AdminTest extends NetworkTestHarness with ZooKeeperTestHarness with Logging { @Test def testReplicaAssignment() { @@ -145,7 +144,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) + val ports = getPorts(4) + val servers = TestUtils.createBrokerConfigs(ports, zkPort, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 @@ -169,6 +169,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") servers.foreach(_.shutdown()) + freePorts(ports) } @Test @@ -176,7 +177,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) + val ports = getPorts(4) + val servers = TestUtils.createBrokerConfigs(ports, zkPort, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 @@ -200,6 +202,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { "New replicas should exist on brokers") servers.foreach(_.shutdown()) + freePorts(ports) } @Test @@ -207,7 +210,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) + val ports = getPorts(4) + val servers = TestUtils.createBrokerConfigs(ports, zkPort, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 @@ -230,13 +234,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") servers.foreach(_.shutdown()) + freePorts(ports) } @Test def testReassigningNonExistingPartition() { val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b))) + val ports = getPorts(4) + val servers = TestUtils.createBrokerConfigs(ports, zkPort, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -246,6 +252,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient) assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition)) servers.foreach(_.shutdown()) + freePorts(ports) } @Test @@ -262,7 +269,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) reassignPartitionsCommand.reassignPartitions // create brokers - val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(new KafkaConfig(b))) + val ports = getPorts(2) + val servers = TestUtils.createBrokerConfigs(ports, zkPort, false).map(b => TestUtils.createServer(new KafkaConfig(b))) // wait until reassignment completes TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient), @@ -275,6 +283,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") servers.foreach(_.shutdown()) + freePorts(ports) } @Test @@ -298,7 +307,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partition = 1 val preferredReplica = 0 // create brokers - val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_)) + val ports = getPorts(3) + val serverConfigs = TestUtils.createBrokerConfigs(ports, zkPort, false).map(new KafkaConfig(_)) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) @@ -310,6 +320,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader)).get assertEquals("Preferred replica election failed", preferredReplica, newLeader) servers.foreach(_.shutdown()) + freePorts(ports) } @Test @@ -318,7 +329,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val topic = "test" val partition = 1 // create brokers - val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_)) + val ports = getPorts(3) + val serverConfigs = TestUtils.createBrokerConfigs(ports, zkPort, false).map(new KafkaConfig(_)) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // create the topic TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers) @@ -354,6 +366,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } finally { servers.foreach(_.shutdown()) + freePorts(ports) } } @@ -365,7 +378,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { def testTopicConfigChange() { val partitions = 3 val topic = "my-topic" - val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0))) + val port = getPort() + val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, port, zkPort))) def makeConfig(messageSize: Int, retentionMs: Long) = { var props = new Properties() @@ -398,6 +412,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } finally { server.shutdown() server.config.logDirs.map(Utils.rm(_)) + freePort(port) } } diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 29cc01b..888f5ef 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -16,10 +16,9 @@ */ package kafka.admin -import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import junit.framework.Assert._ -import kafka.utils.{ZkUtils, TestUtils} +import kafka.utils.{NetworkTestHarness, ZkUtils, TestUtils} import kafka.server.{KafkaServer, KafkaConfig} import org.junit.Test import kafka.common._ @@ -31,7 +30,7 @@ import kafka.producer.KeyedMessage import kafka.common.TopicAndPartition import kafka.api.PartitionOffsetRequestInfo -class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { +class DeleteTopicTest extends NetworkTestHarness with ZooKeeperTestHarness { @Test def testDeleteTopicWithAllAliveReplicas() { @@ -42,6 +41,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.deleteTopic(zkClient, topic) verifyTopicDeletion(topic, servers) servers.foreach(_.shutdown()) + freePorts(servers.map(_.socketServer.port).toList) } @Test @@ -67,6 +67,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { follower.startup() verifyTopicDeletion(topic, servers) servers.foreach(_.shutdown()) + freePorts(servers.map(_.socketServer.port).toList) } @Test @@ -94,6 +95,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { verifyTopicDeletion(topic, servers) servers.foreach(_.shutdown()) + freePorts(servers.map(_.socketServer.port).toList) } @Test @@ -101,7 +103,8 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(4, false) + val ports = getPorts(4) + val brokerConfigs = TestUtils.createBrokerConfigs(ports, zkPort, false) brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) @@ -140,6 +143,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { follower.startup() verifyTopicDeletion(topic, servers) allServers.foreach(_.shutdown()) + freePorts(ports) } @Test @@ -163,6 +167,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty), "Replica logs not for new partition [test,1] not deleted after delete topic is complete.") servers.foreach(_.shutdown()) + freePorts(servers.map(_.socketServer.port).toList) } @Test @@ -180,6 +185,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { assertTrue("Replica logs not deleted after delete topic is complete", servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty)) servers.foreach(_.shutdown()) + freePorts(servers.map(_.socketServer.port).toList) } @Test @@ -200,6 +206,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created.") servers.foreach(_.shutdown()) + freePorts(servers.map(_.socketServer.port).toList) } @Test @@ -221,13 +228,13 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) servers.foreach(_.shutdown()) - + freePorts(servers.map(_.socketServer.port).toList) } private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(3, false) + val brokerConfigs = TestUtils.createBrokerConfigs(getPorts(3), zkPort, false) brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index ac6dd20..1ca42c7 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -18,15 +18,12 @@ package kafka.admin import junit.framework.Assert._ import org.junit.Test -import org.scalatest.junit.JUnit3Suite -import kafka.utils.Logging -import kafka.utils.TestUtils +import kafka.utils.{NetworkTestHarness, Logging, TestUtils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import kafka.server.KafkaConfig import kafka.admin.TopicCommand.TopicCommandOptions -import kafka.utils.ZkUtils -class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { +class TopicCommandTest extends NetworkTestHarness with ZooKeeperTestHarness with Logging { @Test def testConfigPreservationAcrossPartitionAlteration() { diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index c0355cc..cf70bde 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -30,16 +30,15 @@ import kafka.utils._ import org.junit.Test import kafka.serializer._ import kafka.cluster.{Broker, Cluster} -import org.scalatest.junit.JUnit3Suite import kafka.integration.KafkaServerTestHarness -class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { +class ConsumerIteratorTest extends NetworkTestHarness with KafkaServerTestHarness { val numNodes = 1 val configs = - for(props <- TestUtils.createBrokerConfigs(numNodes)) + for(props <- TestUtils.createBrokerConfigs(getPorts(numNodes), zkPort)) yield new KafkaConfig(props) { - override val zkConnect = TestZKUtils.zookeeperConnect + override val zkConnect = ConsumerIteratorTest.this.zkConnect } val messages = new mutable.HashMap[Int, Seq[Message]] val topic = "topic" diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 8c4687b..28b7f97 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -21,7 +21,6 @@ import junit.framework.Assert._ import kafka.integration.KafkaServerTestHarness import kafka.server._ import scala.collection._ -import org.scalatest.junit.JUnit3Suite import kafka.message._ import kafka.serializer._ import org.I0Itec.zkclient.ZkClient @@ -31,16 +30,16 @@ import org.apache.log4j.{Logger, Level} import kafka.utils.TestUtils._ import kafka.common.MessageStreamsExistException -class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { +class ZookeeperConsumerConnectorTest extends NetworkTestHarness with KafkaServerTestHarness with Logging { val RebalanceBackoffMs = 5000 var dirs : ZKGroupTopicDirs = null - val zookeeperConnect = TestZKUtils.zookeeperConnect + val zookeeperConnect = ZookeeperConsumerConnectorTest.this.zkConnect val numNodes = 2 val numParts = 2 val topic = "topic1" val configs = - for(props <- TestUtils.createBrokerConfigs(numNodes)) + for(props <- TestUtils.createBrokerConfigs(getPorts(numNodes), zkPort)) yield new KafkaConfig(props) { override val zkConnect = zookeeperConnect override val numPartitions = numParts diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 95303e0..dc8e632 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -17,21 +17,19 @@ package kafka.integration -import kafka.utils.{ZKGroupTopicDirs, Logging} +import kafka.utils.{NetworkTestHarness, ZKGroupTopicDirs, Logging, TestUtils} import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer} import kafka.server._ -import kafka.utils.TestUtils import kafka.serializer._ import kafka.producer.{Producer, KeyedMessage} import org.junit.Test import org.apache.log4j.{Level, Logger} -import org.scalatest.junit.JUnit3Suite import junit.framework.Assert._ -class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging { +class AutoOffsetResetTest extends NetworkTestHarness with KafkaServerTestHarness with Logging { - val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0))) + val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, getPort(), zkPort))) val topic = "test_topic" val group = "default_group" diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 25845ab..2b581b7 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -24,18 +24,17 @@ import junit.framework.Assert._ import kafka.cluster._ import kafka.server._ -import org.scalatest.junit.JUnit3Suite import kafka.consumer._ import kafka.serializer._ import kafka.producer.{KeyedMessage, Producer} import kafka.utils.TestUtils._ -import kafka.utils.TestUtils +import kafka.utils.{NetworkTestHarness, TestUtils} -class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { +class FetcherTest extends NetworkTestHarness with KafkaServerTestHarness { val numNodes = 1 val configs = - for(props <- TestUtils.createBrokerConfigs(numNodes)) + for(props <- TestUtils.createBrokerConfigs(getPorts(numNodes), zkPort)) yield new KafkaConfig(props) val messages = new mutable.HashMap[Int, Seq[Array[Byte]]] val topic = "topic" diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 3cf7c9b..b1d0d52 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -18,15 +18,14 @@ package kafka.integration import kafka.server._ -import kafka.utils.{Utils, TestUtils} -import org.scalatest.junit.JUnit3Suite +import kafka.utils.{NetworkTestHarness, Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness import kafka.common.KafkaException /** * A test harness that brings up some number of broker nodes */ -trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { +trait KafkaServerTestHarness extends NetworkTestHarness with ZooKeeperTestHarness { val configs: List[KafkaConfig] var servers: List[KafkaServer] = null diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index a5386a0..91e44d6 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -25,22 +25,21 @@ import kafka.producer.{KeyedMessage, Producer} import org.apache.log4j.{Level, Logger} import org.I0Itec.zkclient.ZkClient import kafka.zk.ZooKeeperTestHarness -import org.scalatest.junit.JUnit3Suite import scala.collection._ import kafka.admin.AdminUtils import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} -import kafka.utils.{StaticPartitioner, TestUtils, Utils} +import kafka.utils.{NetworkTestHarness, StaticPartitioner, TestUtils, Utils} import kafka.serializer.StringEncoder import java.util.Properties /** * End to end tests of the primitive apis against a local server */ -class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness { +class PrimitiveApiTest extends NetworkTestHarness with ProducerConsumerTestHarness with ZooKeeperTestHarness { val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) - val port = TestUtils.choosePort() - val props = TestUtils.createBrokerConfig(0, port) + val port = getPort() + val props = TestUtils.createBrokerConfig(0, port, zkPort) val config = new KafkaConfig(props) val configs = List(config) diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala index 108c2e7..6955042 100644 --- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala @@ -18,12 +18,11 @@ package kafka.integration import kafka.consumer.SimpleConsumer -import org.scalatest.junit.JUnit3Suite import kafka.producer.Producer -import kafka.utils.{StaticPartitioner, TestUtils} +import kafka.utils.{NetworkTestHarness, StaticPartitioner, TestUtils} import kafka.serializer.StringEncoder -trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness { +trait ProducerConsumerTestHarness extends NetworkTestHarness with KafkaServerTestHarness { val port: Int val host = "localhost" var producer: Producer[String, String] = null diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index eab4b5f..39d4f5b 100644 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -17,29 +17,28 @@ package kafka.integration -import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import junit.framework.Assert._ -import kafka.utils.{Utils, TestUtils} +import kafka.utils.{NetworkTestHarness, Utils, TestUtils} import kafka.server.{KafkaConfig, KafkaServer} -class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { +class RollingBounceTest extends NetworkTestHarness with ZooKeeperTestHarness { val brokerId1 = 0 val brokerId2 = 1 val brokerId3 = 2 val brokerId4 = 3 - val port1 = TestUtils.choosePort() - val port2 = TestUtils.choosePort() - val port3 = TestUtils.choosePort() - val port4 = TestUtils.choosePort() + val port1 = getPort() + val port2 = getPort() + val port3 = getPort() + val port4 = getPort() // controlled.shutdown.enable is true by default - val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) - val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) - val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3) - val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4) + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, zkPort) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, zkPort) + val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, zkPort) + val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, zkPort) configProps4.put("controlled.shutdown.retry.backoff.ms", "100") var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 35dc071..6ddda27 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -17,21 +17,20 @@ package kafka.integration -import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.admin.AdminUtils import java.nio.ByteBuffer import junit.framework.Assert._ import kafka.cluster.Broker -import kafka.utils.TestUtils +import kafka.utils.{NetworkTestHarness, TestUtils} import kafka.utils.TestUtils._ import kafka.server.{KafkaServer, KafkaConfig} import kafka.api.TopicMetadataRequest import kafka.common.ErrorMapping import kafka.client.ClientUtils -class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { - val props = createBrokerConfigs(1) +class TopicMetadataTest extends NetworkTestHarness with ZooKeeperTestHarness { + val props = createBrokerConfigs(getPorts(1), zkPort) val configs = props.map(p => new KafkaConfig(p)) private var server1: KafkaServer = null val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port)) diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index ba3bcdc..24a74c7 100644 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -22,7 +22,6 @@ import org.apache.kafka.common.config.ConfigException import scala.collection.mutable.MutableList import scala.util.Random import org.apache.log4j.{Level, Logger} -import org.scalatest.junit.JUnit3Suite import java.util.Properties import junit.framework.Assert._ import kafka.admin.AdminUtils @@ -31,22 +30,22 @@ import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException} import kafka.producer.{KeyedMessage, Producer} import kafka.serializer.{DefaultEncoder, StringEncoder} import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.Utils +import kafka.utils.{NetworkTestHarness, Utils} import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness -class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { +class UncleanLeaderElectionTest extends NetworkTestHarness with ZooKeeperTestHarness { val brokerId1 = 0 val brokerId2 = 1 - val port1 = choosePort() - val port2 = choosePort() + val port1 = getPort() + val port2 = getPort() // controlled shutdown is needed for these tests, but we can trim the retry count and backoff interval to // reduce test execution time val enableControlledShutdown = true - val configProps1 = createBrokerConfig(brokerId1, port1) - val configProps2 = createBrokerConfig(brokerId2, port2) + val configProps1 = createBrokerConfig(brokerId1, port1, zkPort) + val configProps2 = createBrokerConfig(brokerId2, port2, zkPort) for (configProps <- List(configProps1, configProps2)) { configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown)) diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index d6248b0..146ce88 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -23,27 +23,25 @@ import kafka.serializer._ import kafka.integration.KafkaServerTestHarness import kafka.producer.KeyedMessage import kafka.javaapi.producer.Producer -import kafka.utils.IntEncoder -import kafka.utils.{Logging, TestUtils} +import kafka.utils.{NetworkTestHarness, IntEncoder, Logging, TestUtils} import kafka.consumer.{KafkaStream, ConsumerConfig} import kafka.zk.ZooKeeperTestHarness import kafka.common.MessageStreamsExistException import scala.collection.JavaConversions -import org.scalatest.junit.JUnit3Suite import org.apache.log4j.{Level, Logger} import junit.framework.Assert._ -class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging { +class ZookeeperConsumerConnectorTest extends NetworkTestHarness with KafkaServerTestHarness with ZooKeeperTestHarness with Logging { val zookeeperConnect = zkConnect val numNodes = 2 val numParts = 2 val topic = "topic1" val configs = - for(props <- TestUtils.createBrokerConfigs(numNodes)) + for(props <- TestUtils.createBrokerConfigs(getPorts(numNodes), zkPort)) yield new KafkaConfig(props) { override val numPartitions = numParts override val zkConnect = zookeeperConnect diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index d670ba7..3f71cc9 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -37,7 +37,8 @@ class LogTest extends JUnitSuite { @Before def setUp() { logDir = TestUtils.tempDir() - val props = TestUtils.createBrokerConfig(0, -1) + val zkPort = 2181 + val props = TestUtils.createBrokerConfig(0, -1, zkPort) config = new KafkaConfig(props) } diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 4ea0489..55e414c 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -19,7 +19,7 @@ package kafka.log4j import kafka.consumer.SimpleConsumer import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{TestUtils, Utils, Logging} +import kafka.utils.{NetworkTestHarness, TestUtils, Utils, Logging} import kafka.api.FetchRequestBuilder import kafka.producer.async.MissingConfigException import kafka.serializer.Encoder @@ -31,11 +31,10 @@ import java.io.File import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.{PropertyConfigurator, Logger} import org.junit.{After, Before, Test} -import org.scalatest.junit.JUnit3Suite import junit.framework.Assert._ -class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { +class KafkaLog4jAppenderTest extends NetworkTestHarness with ZooKeeperTestHarness with Logging { var logDirZk: File = null var config: KafkaConfig = null @@ -47,14 +46,14 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with private val brokerZk = 0 - private val ports = TestUtils.choosePorts(2) + private val ports = getPorts(2) private val portZk = ports(0) @Before override def setUp() { super.setUp() - val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk) + val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk, zkPort) val logDirZkPath = propsZk.getProperty("log.dir") logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 3cf23b3..403a900 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -22,21 +22,19 @@ import junit.framework.Assert._ import kafka.integration.KafkaServerTestHarness import kafka.server._ import scala.collection._ -import org.scalatest.junit.JUnit3Suite import kafka.message._ import kafka.serializer._ import kafka.utils._ import kafka.utils.TestUtils._ -class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging { - val zookeeperConnect = TestZKUtils.zookeeperConnect +class MetricsTest extends NetworkTestHarness with KafkaServerTestHarness with Logging { val numNodes = 2 val numParts = 2 val topic = "topic1" val configs = - for (props <- TestUtils.createBrokerConfigs(numNodes)) + for (props <- TestUtils.createBrokerConfigs(getPorts(numNodes), zkPort)) yield new KafkaConfig(props) { - override val zkConnect = zookeeperConnect + override val zkConnect = MetricsTest.this.zkConnect override val numPartitions = numParts } val nMessages = 2 diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 5f4d852..b3d4c8d 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -35,7 +35,7 @@ class SocketServerTest extends JUnitSuite { val server: SocketServer = new SocketServer(0, host = null, - port = kafka.utils.TestUtils.choosePort, + port = kafka.utils.TestUtils.choosePorts(1).head, numProcessorThreads = 1, maxQueuedRequests = 50, sendBufferSize = 300000, diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 1db6ac3..1396f94 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -36,7 +36,8 @@ import scala.collection.mutable.ArrayBuffer import kafka.utils._ class AsyncProducerTest extends JUnit3Suite { - val props = createBrokerConfigs(1) + val zkPort = 2181 + val props = createBrokerConfigs(TestUtils.choosePorts(1), zkPort) val configs = props.map(p => new KafkaConfig(p)) override def setUp() { diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index ce65dab..895dee1 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -19,7 +19,6 @@ package kafka.producer import org.apache.kafka.common.config.ConfigException import org.scalatest.TestFailedException -import org.scalatest.junit.JUnit3Suite import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer} @@ -37,10 +36,10 @@ import org.junit.Assert.assertEquals import kafka.common.{ErrorMapping, FailedToSendMessageException} import kafka.serializer.StringEncoder -class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ +class ProducerTest extends NetworkTestHarness with ZooKeeperTestHarness with Logging{ private val brokerId1 = 0 private val brokerId2 = 1 - private val ports = TestUtils.choosePorts(2) + private val ports = getPorts(2) private val (port1, port2) = (ports(0), ports(1)) private var server1: KafkaServer = null private var server2: KafkaServer = null @@ -49,10 +48,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) private var servers = List.empty[KafkaServer] - private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false) + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, zkPort, false) props1.put("num.partitions", "4") private val config1 = new KafkaConfig(props1) - private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, zkPort, false) props2.put("num.partitions", "4") private val config2 = new KafkaConfig(props2) diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index d60d8e0..30a3130 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -26,15 +26,14 @@ import kafka.message._ import kafka.server.KafkaConfig import kafka.utils._ import org.junit.Test -import org.scalatest.junit.JUnit3Suite import kafka.api.ProducerResponseStatus import kafka.common.{TopicAndPartition, ErrorMapping} -class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { +class SyncProducerTest extends NetworkTestHarness with KafkaServerTestHarness { private var messageBytes = new Array[Byte](2); // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool. - val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1, false).head)) - val zookeeperConnect = TestZKUtils.zookeeperConnect + val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(getPorts(1), zkPort, false).head)) + val zookeeperConnect = SyncProducerTest.this.zkConnect @Test def testReachableServer() { diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala index f0c4a56..6883dd3 100644 --- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -17,23 +17,22 @@ package kafka.server -import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import junit.framework.Assert._ -import kafka.utils.{ZkUtils, Utils, TestUtils} +import kafka.utils.{NetworkTestHarness, ZkUtils, Utils, TestUtils} -class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { - var server : KafkaServer = null +class AdvertiseBrokerTest extends NetworkTestHarness with ZooKeeperTestHarness { val brokerId = 0 val advertisedHostName = "routable-host" val advertisedPort = 1234 + val props = TestUtils.createBrokerConfig(brokerId, getPort(), zkPort) + props.put("advertised.host.name", advertisedHostName) + props.put("advertised.port", advertisedPort.toString) + var server: KafkaServer = null + override def setUp() { super.setUp() - val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort()) - props.put("advertised.host.name", advertisedHostName) - props.put("advertised.port", advertisedPort.toString) - server = TestUtils.createServer(new KafkaConfig(props)) } diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index ad12116..cad0fb2 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -23,11 +23,10 @@ import kafka.utils._ import kafka.common._ import kafka.log.LogConfig import kafka.admin.{AdminOperationException, AdminUtils} -import org.scalatest.junit.JUnit3Suite -class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { +class DynamicConfigChangeTest extends NetworkTestHarness with KafkaServerTestHarness { - override val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.choosePort))) + override val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, getPort(), zkPort))) @Test def testConfigChange() { diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 8913fc1..6f21f58 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -30,7 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean class HighwatermarkPersistenceTest extends JUnit3Suite { - val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_)) + val zkPort = 2181 + val configs = TestUtils.createBrokerConfigs(TestUtils.choosePorts(2), zkPort).map(new KafkaConfig(_)) val topic = "foo" val logManagers = configs map { config => TestUtils.createLogManager( diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index a703d27..25a2fe3 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -29,7 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean class IsrExpirationTest extends JUnit3Suite { var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]() - val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { + val zkPort = 2181 + val configs = TestUtils.createBrokerConfigs(TestUtils.choosePorts(2), zkPort).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 100L override val replicaFetchWaitMaxMs = 100 override val replicaLagMaxMessages = 10L diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2377abe..6b161b8 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -23,10 +23,11 @@ import org.scalatest.junit.JUnit3Suite import kafka.utils.TestUtils class KafkaConfigTest extends JUnit3Suite { + val zkPort = 2181 @Test def testLogRetentionTimeHoursProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) props.put("log.retention.hours", "1") val cfg = new KafkaConfig(props) @@ -36,7 +37,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRetentionTimeMinutesProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) props.put("log.retention.minutes", "30") val cfg = new KafkaConfig(props) @@ -46,7 +47,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRetentionTimeMsProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) props.put("log.retention.ms", "1800000") val cfg = new KafkaConfig(props) @@ -56,7 +57,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRetentionTimeNoConfigProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) val cfg = new KafkaConfig(props) assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis) @@ -65,7 +66,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRetentionTimeBothMinutesAndHoursProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) props.put("log.retention.minutes", "30") props.put("log.retention.hours", "1") @@ -76,7 +77,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRetentionTimeBothMinutesAndMsProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) props.put("log.retention.ms", "1800000") props.put("log.retention.minutes", "10") @@ -90,7 +91,7 @@ class KafkaConfigTest extends JUnit3Suite { val port = 9999 val hostName = "fake-host" - val props = TestUtils.createBrokerConfig(0, port) + val props = TestUtils.createBrokerConfig(0, port, zkPort) props.put("host.name", hostName) val serverConfig = new KafkaConfig(props) @@ -105,7 +106,7 @@ class KafkaConfigTest extends JUnit3Suite { val advertisedHostName = "routable-host" val advertisedPort = 1234 - val props = TestUtils.createBrokerConfig(0, port) + val props = TestUtils.createBrokerConfig(0, port, zkPort) props.put("advertised.host.name", advertisedHostName) props.put("advertised.port", advertisedPort.toString) @@ -117,7 +118,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testUncleanLeaderElectionDefault() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) val serverConfig = new KafkaConfig(props) assertEquals(serverConfig.uncleanLeaderElectionEnable, true) @@ -125,7 +126,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testUncleanElectionDisabled() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) props.put("unclean.leader.election.enable", String.valueOf(false)) val serverConfig = new KafkaConfig(props) @@ -134,7 +135,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testUncleanElectionEnabled() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) props.put("unclean.leader.election.enable", String.valueOf(true)) val serverConfig = new KafkaConfig(props) @@ -143,7 +144,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testUncleanElectionInvalid() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) props.put("unclean.leader.election.enable", "invalid") intercept[IllegalArgumentException] { @@ -153,7 +154,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRollTimeMsProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) props.put("log.roll.ms", "1800000") val cfg = new KafkaConfig(props) @@ -163,7 +164,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRollTimeBothMsAndHoursProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) props.put("log.roll.ms", "1800000") props.put("log.roll.hours", "1") @@ -174,7 +175,7 @@ class KafkaConfigTest extends JUnit3Suite { @Test def testLogRollTimeNoConfigProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) + val props = TestUtils.createBrokerConfig(0, 8181, zkPort) val cfg = new KafkaConfig(props) assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis ) diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index c2ba07c..e4d0e8e 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -17,25 +17,25 @@ package kafka.server -import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import junit.framework.Assert._ -import kafka.utils.{ZkUtils, Utils, TestUtils} +import kafka.utils.{NetworkTestHarness, ZkUtils, Utils, TestUtils} import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch, ControllerChannelManager} import kafka.cluster.Broker import kafka.common.ErrorMapping import kafka.api._ -class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { +class LeaderElectionTest extends NetworkTestHarness with ZooKeeperTestHarness { val brokerId1 = 0 val brokerId2 = 1 - val port1 = TestUtils.choosePort() - val port2 = TestUtils.choosePort() + val port1 = getPort() + val port2 = getPort() + val extraControllerPort = getPort() - val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false) - val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false) + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, zkPort, false) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, zkPort, false) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] var staleControllerEpochDetected = false @@ -117,7 +117,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // start another controller val controllerId = 2 - val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort())) + val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, extraControllerPort, zkPort)) val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port)) val controllerContext = new ControllerContext(zkClient, 6000) controllerContext.liveBrokers = brokers.toSet diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index c06ee75..af7d1e9 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -24,7 +24,6 @@ import java.util.{Random, Properties} import kafka.consumer.SimpleConsumer import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.zk.ZooKeeperTestHarness -import org.scalatest.junit.JUnit3Suite import kafka.admin.AdminUtils import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} import kafka.utils.TestUtils._ @@ -33,7 +32,7 @@ import org.junit.After import org.junit.Before import org.junit.Test -class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { +class LogOffsetTest extends NetworkTestHarness with ZooKeeperTestHarness { val random = new Random() var logDir: File = null var topicLogDir: File = null diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index d5d351c..7a2042e 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -17,8 +17,7 @@ package kafka.server import kafka.utils.TestUtils._ -import kafka.utils.IntEncoder -import kafka.utils.{Utils, TestUtils} +import kafka.utils.{NetworkTestHarness, IntEncoder, Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness import kafka.common._ import kafka.producer.{KeyedMessage, Producer} @@ -26,12 +25,11 @@ import kafka.serializer.StringEncoder import java.io.File -import org.scalatest.junit.JUnit3Suite import org.junit.Assert._ -class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { +class LogRecoveryTest extends NetworkTestHarness with ZooKeeperTestHarness { - val configs = TestUtils.createBrokerConfigs(2, false).map(new KafkaConfig(_) { + val configs = TestUtils.createBrokerConfigs(getPorts(2), zkPort, false).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 5000L override val replicaLagMaxMessages = 10L override val replicaFetchWaitMaxMs = 1000 diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 8c5364f..1735de8 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -24,14 +24,13 @@ import java.util.Properties import kafka.consumer.SimpleConsumer import org.junit.{After, Before, Test} import kafka.zk.ZooKeeperTestHarness -import org.scalatest.junit.JUnit3Suite import kafka.api.{ConsumerMetadataRequest, OffsetCommitRequest, OffsetFetchRequest} import kafka.utils.TestUtils._ import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition} import scala.util.Random import scala.collection._ -class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { +class OffsetCommitTest extends NetworkTestHarness with ZooKeeperTestHarness { val random: Random = new Random() var logDir: File = null var topicLogDir: File = null @@ -45,7 +44,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { @Before override def setUp() { super.setUp() - val config: Properties = createBrokerConfig(1, brokerPort) + val config: Properties = createBrokerConfig(1, brokerPort, zkPort) val logDirPath = config.getProperty("log.dir") logDir = new File(logDirPath) time = new MockTime() diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index da4bafc..af98ce8 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -17,17 +17,16 @@ package kafka.server -import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import kafka.producer.KeyedMessage import kafka.serializer.StringEncoder -import kafka.utils.TestUtils +import kafka.utils.{NetworkTestHarness, TestUtils} import junit.framework.Assert._ import kafka.common._ -class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { - val props = createBrokerConfigs(2,false) +class ReplicaFetchTest extends NetworkTestHarness with ZooKeeperTestHarness { + val props = createBrokerConfigs(getPorts(2), zkPort, false) val configs = props.map(p => new KafkaConfig(p)) var brokers: Seq[KafkaServer] = null val topic1 = "foo" diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index faa9071..2269f15 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -30,10 +30,12 @@ import org.junit.Test class ReplicaManagerTest extends JUnit3Suite { val topic = "test-topic" + val port = TestUtils.choosePorts(1).head + val zkPort = 2181 @Test def testHighWaterMarkDirectoryMapping() { - val props = TestUtils.createBrokerConfig(1) + val props = TestUtils.createBrokerConfig(1, port, zkPort) val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) @@ -49,7 +51,7 @@ class ReplicaManagerTest extends JUnit3Suite { @Test def testHighwaterMarkRelativeDirectoryMapping() { - val props = TestUtils.createBrokerConfig(1) + val props = TestUtils.createBrokerConfig(1, port, zkPort) props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 67918f2..da6d4fe 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.zk.ZooKeeperTestHarness import kafka.consumer.SimpleConsumer import kafka.producer._ -import kafka.utils.{IntEncoder, TestUtils, Utils} +import kafka.utils.{NetworkTestHarness, IntEncoder, TestUtils, Utils} import kafka.utils.TestUtils._ import kafka.api.FetchRequestBuilder import kafka.message.ByteBufferMessageSet @@ -28,12 +28,11 @@ import kafka.serializer.StringEncoder import java.io.File import org.junit.Test -import org.scalatest.junit.JUnit3Suite import junit.framework.Assert._ -class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { - val port = TestUtils.choosePort - val props = TestUtils.createBrokerConfig(0, port) +class ServerShutdownTest extends NetworkTestHarness with ZooKeeperTestHarness { + val port = getPort() + val props = TestUtils.createBrokerConfig(0, port, zkPort) val config = new KafkaConfig(props) val host = "localhost" @@ -103,7 +102,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testCleanShutdownWithDeleteTopicEnabled() { - val newProps = TestUtils.createBrokerConfig(0, port) + val newProps = TestUtils.createBrokerConfig(0, port, zkPort) newProps.setProperty("delete.topic.enable", "true") val newConfig = new KafkaConfig(newProps) val server = new KafkaServer(newConfig) @@ -116,7 +115,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testCleanShutdownAfterFailedStartup() { - val newProps = TestUtils.createBrokerConfig(0, port) + val newProps = TestUtils.createBrokerConfig(0, port, zkPort) newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535") val newConfig = new KafkaConfig(newProps) var server = new KafkaServer(newConfig) diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index 8fe7cd4..fb3e12b 100644 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -17,24 +17,22 @@ package kafka.server -import org.scalatest.junit.JUnit3Suite -import kafka.utils.ZkUtils -import kafka.utils.Utils -import kafka.utils.TestUtils +import kafka.utils.{NetworkTestHarness, ZkUtils, Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness import junit.framework.Assert._ -class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness { - var server : KafkaServer = null +class ServerStartupTest extends NetworkTestHarness with ZooKeeperTestHarness { val brokerId = 0 val zookeeperChroot = "/kafka-chroot-for-unittest" + val props = TestUtils.createBrokerConfig(brokerId, getPort(), zkPort) + val zooKeeperConnect = props.get("zookeeper.connect") + props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot) + var server: KafkaServer = null + override def setUp() { super.setUp() - val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort()) - val zooKeeperConnect = props.get("zookeeper.connect") - props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot) server = TestUtils.createServer(new KafkaConfig(props)) } diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index ccf5e2e..0144806 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -35,7 +35,8 @@ import junit.framework.Assert._ class SimpleFetchTest extends JUnit3Suite { - val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { + val zkPort = 2181 + val configs = TestUtils.createBrokerConfigs(TestUtils.choosePorts(2), zkPort).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 100L override val replicaFetchWaitMaxMs = 100 override val replicaLagMaxMessages = 10L diff --git a/core/src/test/scala/unit/kafka/utils/NetworkTestHarness.scala b/core/src/test/scala/unit/kafka/utils/NetworkTestHarness.scala new file mode 100644 index 0000000..92a7509 --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/NetworkTestHarness.scala @@ -0,0 +1,29 @@ +package kafka.utils + +import org.scalatest.junit.JUnit3Suite + +abstract class NetworkTestHarness(val numTestPorts: Int = 5) extends JUnit3Suite { + // This *must* be lazy to allow overriding by subclasses + lazy private val preallocatedPorts: List[Int] = TestUtils.choosePorts(numTestPorts) + lazy val availablePorts = collection.mutable.Queue(preallocatedPorts: _*) + + override def tearDown() { + super.tearDown() + } + + def getPort(): Int = { + availablePorts.dequeue() + } + + def getPorts(n: Int): List[Int] = { + (0 until n).map(_ => getPort()).toList + } + + def freePort(port: Int) { + availablePorts.enqueue(port) + } + + def freePorts(ports: List[Int]) { + availablePorts.enqueue(ports: _*) + } +} diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala index 84e0855..533faa5 100644 --- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala @@ -21,13 +21,12 @@ import kafka.server.{ReplicaFetcherManager, KafkaConfig} import kafka.api.LeaderAndIsr import kafka.zk.ZooKeeperTestHarness import kafka.common.TopicAndPartition -import org.scalatest.junit.JUnit3Suite import org.junit.Assert._ import org.junit.Test import org.easymock.EasyMock -class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { +class ReplicationUtilsTest extends NetworkTestHarness with ZooKeeperTestHarness { val topic = "my-topic-test" val partitionId = 0 val brokerId = 1 @@ -42,6 +41,7 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { val topicDataMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" -> 1, "versions" -> 2, "leader_epoch" -> 2,"isr" -> List(1,2))) + val configs = TestUtils.createBrokerConfigs(getPorts(1), zkPort).map(new KafkaConfig(_)) override def setUp() { super.setUp() @@ -50,7 +50,6 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testUpdateLeaderAndIsr() { - val configs = TestUtils.createBrokerConfigs(1).map(new KafkaConfig(_)) val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes() EasyMock.expect(log) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 0da774d..21ab3ff 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -64,7 +64,9 @@ object TestUtils extends Logging { val random = new Random() /** - * Choose a number of random available ports + * Choose a number of random available ports. Note that you *must* allocate all your ports for a single test with a + * single call to choosePorts OR ensure that any ports you have already allocated are actively in use so choosePorts() + * will not reuse them. */ def choosePorts(count: Int): List[Int] = { val sockets = @@ -77,11 +79,6 @@ object TestUtils extends Logging { } /** - * Choose an available port - */ - def choosePort(): Int = choosePorts(1).head - - /** * Create a temporary directory */ def tempDir(): File = { @@ -138,10 +135,10 @@ object TestUtils extends Logging { /** * Create a test config for the given node id */ - def createBrokerConfigs(numConfigs: Int, + def createBrokerConfigs(ports: List[Int], zkPort: Int, enableControlledShutdown: Boolean = true): List[Properties] = { - for((port, node) <- choosePorts(numConfigs).zipWithIndex) - yield createBrokerConfig(node, port, enableControlledShutdown) + for((port, node) <- ports.zipWithIndex) + yield createBrokerConfig(node, port, zkPort, enableControlledShutdown) } def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = { @@ -151,14 +148,14 @@ object TestUtils extends Logging { /** * Create a test config for the given node id */ - def createBrokerConfig(nodeId: Int, port: Int = choosePort(), + def createBrokerConfig(nodeId: Int, port: Int, zkPort: Int, enableControlledShutdown: Boolean = true): Properties = { val props = new Properties props.put("broker.id", nodeId.toString) props.put("host.name", "localhost") props.put("port", port.toString) props.put("log.dir", TestUtils.tempDir().getAbsolutePath) - props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) + props.put("zookeeper.connect", TestZKUtils.zookeeperConnect(zkPort)) props.put("replica.socket.timeout.ms", "1500") props.put("controlled.shutdown.enable", enableControlledShutdown.toString) props @@ -791,7 +788,7 @@ object TestUtils extends Logging { } object TestZKUtils { - val zookeeperConnect = "127.0.0.1:" + TestUtils.choosePort() + def zookeeperConnect(port: Int) = "127.0.0.1:" + port } class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] { diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala index 85eec6f..baec705 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala @@ -19,12 +19,10 @@ package kafka.zk import kafka.consumer.ConsumerConfig import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZkUtils, ZKStringSerializer} -import kafka.utils.TestUtils +import kafka.utils.{NetworkTestHarness, ZkUtils, ZKStringSerializer, TestUtils} import org.junit.Assert -import org.scalatest.junit.JUnit3Suite -class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness { +class ZKEphemeralTest extends NetworkTestHarness with ZooKeeperTestHarness { var zkSessionTimeoutMs = 1000 def testEphemeralNodeCleanup = { diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 67d9c4b..9269bcb 100644 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -17,12 +17,12 @@ package kafka.zk -import org.scalatest.junit.JUnit3Suite import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils} +import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils, NetworkTestHarness} -trait ZooKeeperTestHarness extends JUnit3Suite { - val zkConnect: String = TestZKUtils.zookeeperConnect +trait ZooKeeperTestHarness extends NetworkTestHarness { + val zkPort: Int = getPort() + val zkConnect: String = TestZKUtils.zookeeperConnect(zkPort) var zookeeper: EmbeddedZookeeper = null var zkClient: ZkClient = null val zkConnectionTimeout = 6000 -- 2.1.3