From 34be2c1040bcc650e75755463e66dd79e97d6930 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sun, 5 Apr 2015 11:07:55 -0700 Subject: [PATCH] KAFKA-1954. Speed up unit tests. --- .../scala/unit/kafka/admin/AddPartitionsTest.scala | 16 +--- .../test/scala/unit/kafka/admin/AdminTest.scala | 29 +++--- .../scala/unit/kafka/admin/DeleteTopicTest.scala | 19 ++-- .../kafka/integration/KafkaServerTestHarness.scala | 16 ++-- .../unit/kafka/integration/RollingBounceTest.scala | 21 +--- .../unit/kafka/integration/TopicMetadataTest.scala | 21 ++-- .../integration/UncleanLeaderElectionTest.scala | 19 ++-- .../unit/kafka/log4j/KafkaLog4jAppenderTest.scala | 25 ++--- .../scala/unit/kafka/producer/ProducerTest.scala | 62 +++++------- .../unit/kafka/server/AdvertiseBrokerTest.scala | 35 +++---- .../unit/kafka/server/LeaderElectionTest.scala | 26 +---- .../scala/unit/kafka/server/LogOffsetTest.scala | 39 ++++---- .../scala/unit/kafka/server/LogRecoveryTest.scala | 22 ++--- .../scala/unit/kafka/server/ReplicaFetchTest.scala | 25 ++--- .../kafka/server/ServerGenerateBrokerIdTest.scala | 106 +++++++-------------- .../test/scala/unit/kafka/utils/TestUtils.scala | 4 +- 16 files changed, 178 insertions(+), 307 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 99ac923..d7e369f 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -18,19 +18,19 @@ package kafka.admin import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness +import kafka.integration.KafkaServerTestHarness import kafka.utils.TestUtils._ +import org.junit.Test import junit.framework.Assert._ import kafka.utils.{ZkUtils, Utils, TestUtils} import kafka.cluster.Broker import kafka.client.ClientUtils import kafka.server.{KafkaConfig, KafkaServer} -class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { - var configs: Seq[KafkaConfig] = null - var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] +class AddPartitionsTest extends JUnit3Suite with KafkaServerTestHarness { + val numNodes = 4 + def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps) var brokers: Seq[Broker] = Seq.empty[Broker] - val partitionId = 0 val topic1 = "new-topic1" @@ -40,10 +40,6 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { override def setUp() { super.setUp() - - configs = (0 until 4).map(i => KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, zkConnect, enableControlledShutdown = false))) - // start all the servers - servers = configs.map(c => TestUtils.createServer(c)) brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.boundPort)) // create topics first @@ -54,8 +50,6 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDirs)) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 0305f70..13ab94b 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -145,7 +145,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).par.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))).toList.sortBy(_.config.brokerId) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 @@ -168,7 +168,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } @Test @@ -176,7 +176,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).par.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))).toList.sortBy(_.config.brokerId) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 @@ -199,7 +199,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } @Test @@ -207,7 +207,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).par.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))).toList.sortBy(_.config.brokerId) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 @@ -229,14 +229,14 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } @Test def testReassigningNonExistingPartition() { val topic = "test" // create brokers - val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + val servers = TestUtils.createBrokerConfigs(4, zkConnect, false).par.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))).toList.sortBy(_.config.brokerId) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -245,7 +245,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient) assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition)) - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } @Test @@ -262,7 +262,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas)) reassignPartitionsCommand.reassignPartitions // create brokers - val servers = TestUtils.createBrokerConfigs(2, zkConnect, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + val servers = TestUtils.createBrokerConfigs(2, zkConnect, false).par.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))).toList.sortBy(_.config.brokerId) // wait until reassignment completes TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient), @@ -274,7 +274,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers) TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, "New replicas should exist on brokers") - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } @Test @@ -301,6 +301,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + //sort in reverse order val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None).get @@ -309,7 +310,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { preferredReplicaElection.moveLeaderToPreferredReplica() val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = Some(currentLeader)).get assertEquals("Preferred replica election failed", preferredReplica, newLeader) - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } @Test @@ -319,7 +320,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partition = 1 // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) - val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) + val servers = serverConfigs.par.map(s => TestUtils.createServer(s)).toList.sortBy(- _.config.brokerId) // create the topic TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers) @@ -353,7 +354,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) } finally { - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } } @@ -397,7 +398,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { checkConfig(2*maxMessageSize, 2 * retentionMs) } finally { server.shutdown() - server.config.logDirs.map(Utils.rm(_)) + server.config.logDirs.par.map(Utils.rm(_)) } } diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 61cc602..a976b95 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -61,7 +61,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // restart follower replica follower.startup() TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } @Test @@ -88,7 +88,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { follower.startup() TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } @Test @@ -134,7 +134,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas) follower.startup() TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) - allServers.foreach(_.shutdown()) + allServers.par.map(_.shutdown()) } @Test @@ -157,7 +157,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty), "Replica logs not for new partition [test,1] not deleted after delete topic is complete.") - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } @Test @@ -174,7 +174,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // verify that new partition doesn't exist on any broker either assertTrue("Replica logs not deleted after delete topic is complete", servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty)) - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } @Test @@ -194,7 +194,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // check if all replica logs are created TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created.") - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } @Test @@ -215,7 +215,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // topic test should have a leader val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } @Test @@ -248,14 +248,13 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.deleteTopic(zkClient, "test") TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers) - servers.foreach(_.shutdown()) + servers.par.map(_.shutdown()) } private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true") - ) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) createTestTopicAndCluster(topic,brokerConfigs) } diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 28e3122..b4cfb70 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -47,25 +47,25 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { } def serverForId(id: Int) = servers.find(s => s.config.brokerId == id) - + def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",") - + override def setUp() { super.setUp if(configs.size <= 0) throw new KafkaException("Must supply at least one server config.") - servers = configs.map(TestUtils.createServer(_)).toBuffer + servers = configs.par.map(TestUtils.createServer(_)).toBuffer.sortBy(_.config.brokerId) brokerList = TestUtils.getBrokerListStrFromServers(servers) alive = new Array[Boolean](servers.length) Arrays.fill(alive, true) } override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => server.config.logDirs.map(Utils.rm(_))) + servers.par.map(server => server.shutdown()) + servers.par.map(server => server.config.logDirs.map(Utils.rm(_))) super.tearDown } - + /** * Pick a broker at random and kill it if it isn't already dead * Return the id of the broker killed @@ -79,7 +79,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { } index } - + /** * Restart any dead brokers */ diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index 130b205..45f1815 100644 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -18,32 +18,15 @@ package kafka.integration import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import junit.framework.Assert._ import kafka.utils.{Utils, TestUtils} import kafka.server.{KafkaConfig, KafkaServer} -class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { +class RollingBounceTest extends JUnit3Suite with KafkaServerTestHarness { val partitionId = 0 - var servers: Seq[KafkaServer] = null - - override def setUp() { - super.setUp() - // controlled.shutdown.enable is true by default - val configs = (0 until 4).map(i => TestUtils.createBrokerConfig(i, zkConnect)) - configs(3).put("controlled.shutdown.retry.backoff.ms", "100") - - // start all the servers - servers = configs.map(c => TestUtils.createServer(KafkaConfig.fromProps(c))) - } - - override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDirs)) - super.tearDown() - } + def generateConfigs = TestUtils.createBrokerConfigs(4, zkConnect, enableControlledShutdown = true).map(KafkaConfig.fromProps) def testRollingBounce { // start all the brokers diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 56b1b8c..35d0d58 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -18,7 +18,6 @@ package kafka.integration import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness import kafka.admin.AdminUtils import java.nio.ByteBuffer import junit.framework.Assert._ @@ -30,20 +29,18 @@ import kafka.api.TopicMetadataRequest import kafka.common.ErrorMapping import kafka.client.ClientUtils -class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { - private var server1: KafkaServer = null +class TopicMetadataTest extends JUnit3Suite with KafkaServerTestHarness { + val numNodes = 1 + def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps) + var brokers: Seq[Broker] = null override def setUp() { super.setUp() - val props = createBrokerConfigs(1, zkConnect) - val configs = props.map(KafkaConfig.fromProps) - server1 = TestUtils.createServer(configs.head) - brokers = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort())) + brokers = Seq(new Broker(servers.head.config.brokerId, servers.head.config.hostName, servers.head.boundPort())) } override def tearDown() { - server1.shutdown() super.tearDown() } @@ -66,7 +63,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testBasicTopicMetadata { // create topic val topic = "test" - createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) + createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", 2000,0).topicsMetadata @@ -84,8 +81,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { // create topic val topic1 = "testGetAllTopicMetadata1" val topic2 = "testGetAllTopicMetadata2" - createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) - createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) + createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, servers = servers) + createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = servers) // issue metadata request with empty list of topics var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata", @@ -116,7 +113,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { // wait for leader to be elected TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) // retry the metadata for the auto created topic topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 7c87b81..5c4829d 100644 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -67,7 +67,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { for (configProps <- List(configProps1, configProps2)) { configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown)) configProps.put("controlled.shutdown.max.retries", String.valueOf(1)) - configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000)) + configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(100)) } // temporarily set loggers to a higher level so that tests run quietly @@ -78,8 +78,8 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - servers.map(server => shutdownServer(server)) - servers.map(server => Utils.rm(server.config.logDirs)) + servers.par.map(server => shutdownServer(server)) + servers.par.map(server => Utils.rm(server.config.logDirs)) // restore log levels kafkaApisLogger.setLevel(Level.ERROR) @@ -91,12 +91,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { } private def startBrokers(cluster: Seq[Properties]) { - for (props <- cluster) { - val config = KafkaConfig.fromProps(props) - val server = createServer(config) - configs ++= List(config) - servers ++= List(server) - } + servers = cluster.par.map(props => createServer(KafkaConfig.fromProps(props))).toList.sortBy(_.config.brokerId) } def testUncleanLeaderElectionEnabled { @@ -110,9 +105,9 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { } def testUncleanLeaderElectionDisabled { - // disable unclean leader election - configProps1.put("unclean.leader.election.enable", String.valueOf(false)) - configProps2.put("unclean.leader.election.enable", String.valueOf(false)) + // disable unclean leader election + configProps1.put("unclean.leader.election.enable", String.valueOf(false)) + configProps2.put("unclean.leader.election.enable", String.valueOf(false)) startBrokers(Seq(configProps1, configProps2)) // create topic with 1 partition, 2 replicas, one on each broker diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 18361c1..aa9a2dd 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -23,7 +23,7 @@ import kafka.utils.{TestUtils, Utils, Logging} import kafka.api.FetchRequestBuilder import kafka.producer.async.MissingConfigException import kafka.serializer.Encoder -import kafka.zk.ZooKeeperTestHarness +import kafka.integration.KafkaServerTestHarness import java.util.Properties import java.io.File @@ -35,35 +35,25 @@ import org.scalatest.junit.JUnit3Suite import junit.framework.Assert._ -class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { +class KafkaLog4jAppenderTest extends JUnit3Suite with KafkaServerTestHarness with Logging { - var logDirZk: File = null - var config: KafkaConfig = null - var server: KafkaServer = null + val numNodes = 1 + def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps) var simpleConsumerZk: SimpleConsumer = null val tLogger = Logger.getLogger(getClass()) - private val brokerZk = 0 @Before override def setUp() { super.setUp() - - val propsZk = TestUtils.createBrokerConfig(brokerZk, zkConnect) - val logDirZkPath = propsZk.getProperty("log.dir") - logDirZk = new File(logDirZkPath) - config = KafkaConfig.fromProps(propsZk) - server = TestUtils.createServer(config) - simpleConsumerZk = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64 * 1024, "") + simpleConsumerZk = new SimpleConsumer("localhost", servers.head.boundPort(), 1000000, 64 * 1024, "") } @After override def tearDown() { simpleConsumerZk.close - server.shutdown - Utils.rm(logDirZk) super.tearDown() } @@ -91,7 +81,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") - props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromServers(Seq(server))) + props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromServers(servers)) props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") try { @@ -126,7 +116,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") - props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromServers(Seq(server))) + props.put("log4j.appender.KAFKA.BrokerList", TestUtils.getBrokerListStrFromServers(servers)) props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.RequiredNumAcks", "1") props.put("log4j.appender.KAFKA.SyncSend", "true") @@ -140,4 +130,3 @@ class AppenderStringEncoder(encoding: String = "UTF-8") extends Encoder[LoggingE event.getMessage.toString.getBytes(encoding) } } - diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index a7ca142..b919121 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -22,7 +22,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer} -import kafka.zk.ZooKeeperTestHarness +import kafka.integration.KafkaServerTestHarness import org.apache.log4j.{Level, Logger} import org.junit.Test import kafka.utils._ @@ -36,46 +36,40 @@ import org.junit.Assert.assertEquals import kafka.common.{ErrorMapping, FailedToSendMessageException} import kafka.serializer.StringEncoder -class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ - private val brokerId1 = 0 - private val brokerId2 = 1 - private var server1: KafkaServer = null - private var server2: KafkaServer = null +class ProducerTest extends JUnit3Suite with KafkaServerTestHarness with Logging{ + val numNodes = 2 + val numParts = 4 + val overridingProps = new Properties() + overridingProps.put(KafkaConfig.NumPartitionsProp, numNodes.toString) + def generateConfigs() = + for (props <- TestUtils.createBrokerConfigs(numNodes, zkConnect)) + yield KafkaConfig.fromProps(props, overridingProps) + private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) - private var servers = List.empty[KafkaServer] + // Creation of consumers is deferred until they are actually needed. This allows us to kill brokers that use random // ports and then get a consumer instance that will be pointed at the correct port def getConsumer1() = { if (consumer1 == null) - consumer1 = new SimpleConsumer("localhost", server1.boundPort(), 1000000, 64*1024, "") + consumer1 = new SimpleConsumer("localhost", servers.head.boundPort(), 1000000, 64*1024, "") consumer1 } def getConsumer2() = { if (consumer2 == null) - consumer2 = new SimpleConsumer("localhost", server2.boundPort(), 100, 64*1024, "") + consumer2 = new SimpleConsumer("localhost", servers.last.boundPort(), 100, 64*1024, "") consumer2 } override def setUp() { super.setUp() - // set up 2 brokers with 4 partitions each - val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, false) - props1.put("num.partitions", "4") - val config1 = KafkaConfig.fromProps(props1) - val props2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, false) - props2.put("num.partitions", "4") - val config2 = KafkaConfig.fromProps(props2) - server1 = TestUtils.createServer(config1) - server2 = TestUtils.createServer(config2) - servers = List(server1,server2) val props = new Properties() props.put("host", "localhost") - props.put("port", server1.boundPort().toString) + props.put("port", servers.head.boundPort().toString) // temporarily set request handler logger to a higher level requestHandlerLogger.setLevel(Level.FATAL) @@ -90,10 +84,6 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ if (consumer2 != null) consumer2.close() - server1.shutdown - server2.shutdown - Utils.rm(server1.config.logDirs) - Utils.rm(server2.config.logDirs) super.tearDown() } @@ -122,7 +112,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } val producer2 = TestUtils.createProducer[String, String]( - brokerList = "localhost:80," + TestUtils.getBrokerListStrFromServers(Seq(server1)), + brokerList = "localhost:80," + TestUtils.getBrokerListStrFromServers(Seq(servers.head)), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName) @@ -135,7 +125,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } val producer3 = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)), + brokerList = TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName) @@ -158,7 +148,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers) val producer1 = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)), + brokerList = TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName, @@ -172,7 +162,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined) val leader = leaderOpt.get - val messageSet = if(leader == server1.config.brokerId) { + val messageSet = if(leader == servers.head.config.brokerId) { val response1 = getConsumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) response1.messageSet("new-topic", 0).iterator.toBuffer }else { @@ -191,7 +181,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ try { val producer2 = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)), + brokerList = TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName, @@ -221,7 +211,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ servers = servers) val producer = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)), + brokerList = TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName, @@ -236,8 +226,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } // kill the broker - server1.shutdown - server1.awaitShutdown() + servers.head.shutdown + servers.head.awaitShutdown() try { // These sends should fail since there are no available brokers @@ -249,7 +239,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } // restart server 1 - server1.startup() + servers.head.startup() TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) @@ -275,7 +265,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("message.send.max.retries", "0") props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout") val producer = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)), + brokerList = TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName, @@ -300,7 +290,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // stop IO threads and request handling, but leave networking operational // any requests should be accepted and queue up, but not handled - server1.requestHandlerPool.shutdown() + servers.head.requestHandlerPool.shutdown() val t1 = SystemTime.milliseconds try { @@ -322,7 +312,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ @Test def testSendNullMessage() { val producer = TestUtils.createProducer[String, String]( - brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)), + brokerList = TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName) diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala index b011240..936c078 100644 --- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -18,35 +18,28 @@ package kafka.server import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness +import kafka.integration.KafkaServerTestHarness import junit.framework.Assert._ import kafka.utils.{ZkUtils, Utils, TestUtils} +import java.util.Properties -class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { - var server : KafkaServer = null - val brokerId = 0 +class AdvertiseBrokerTest extends JUnit3Suite with KafkaServerTestHarness { + val numNodes = 1 val advertisedHostName = "routable-host" val advertisedPort = 1234 - override def setUp() { - super.setUp() - val props = TestUtils.createBrokerConfig(brokerId, zkConnect) - props.put("advertised.host.name", advertisedHostName) - props.put("advertised.port", advertisedPort.toString) - - server = TestUtils.createServer(KafkaConfig.fromProps(props)) - } + val overridingProps = new Properties() + overridingProps.put(KafkaConfig.AdvertisedHostNameProp, advertisedHostName) + overridingProps.put(KafkaConfig.AdvertisedPortProp, advertisedPort.toString) + + def generateConfigs() = + for (props <- TestUtils.createBrokerConfigs(numNodes, zkConnect)) + yield KafkaConfig.fromProps(props, overridingProps) - override def tearDown() { - server.shutdown() - Utils.rm(server.config.logDirs) - super.tearDown() - } - def testBrokerAdvertiseToZK { - val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId) + val brokerInfo = ZkUtils.getBrokerInfo(zkClient, servers.head.config.brokerId) assertEquals(advertisedHostName, brokerInfo.get.host) assertEquals(advertisedPort, brokerInfo.get.port) } - -} \ No newline at end of file + +} diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 3d4258f..3eade52 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -18,7 +18,7 @@ package kafka.server import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness +import kafka.integration.KafkaServerTestHarness import kafka.utils.TestUtils._ import junit.framework.Assert._ import kafka.utils.{ZkUtils, Utils, TestUtils} @@ -27,32 +27,14 @@ import kafka.cluster.Broker import kafka.common.ErrorMapping import kafka.api._ -class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { +class LeaderElectionTest extends JUnit3Suite with KafkaServerTestHarness { val brokerId1 = 0 val brokerId2 = 1 - var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] - + val numNodes = 2 + def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect, enableControlledShutdown = true).map(KafkaConfig.fromProps) var staleControllerEpochDetected = false - override def setUp() { - super.setUp() - - val configProps1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, enableControlledShutdown = false) - val configProps2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, enableControlledShutdown = false) - - // start both servers - val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1)) - val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2)) - servers ++= List(server1, server2) - } - - override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDirs)) - super.tearDown() - } - def testLeaderElectionAndEpoch { // start 2 brokers val topic = "new-topic" diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 496bf0d..b4ab8f4 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -23,7 +23,7 @@ import junit.framework.Assert._ import java.util.{Random, Properties} import kafka.consumer.SimpleConsumer import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} -import kafka.zk.ZooKeeperTestHarness +import kafka.integration.KafkaServerTestHarness import org.scalatest.junit.JUnit3Suite import kafka.admin.AdminUtils import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} @@ -33,31 +33,24 @@ import org.junit.After import org.junit.Before import org.junit.Test -class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { - val random = new Random() - var logDir: File = null +class LogOffsetTest extends JUnit3Suite with KafkaServerTestHarness { + val random = new Random() var topicLogDir: File = null var server: KafkaServer = null var logSize: Int = 100 + def generateConfigs() = List(KafkaConfig.fromProps(createBrokerConfig(1))) var simpleConsumer: SimpleConsumer = null var time: Time = new MockTime() @Before override def setUp() { super.setUp() - val config: Properties = createBrokerConfig(1) - val logDirPath = config.getProperty("log.dir") - logDir = new File(logDirPath) - time = new MockTime() - server = TestUtils.createServer(KafkaConfig.fromProps(config), time) - simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "") + simpleConsumer = new SimpleConsumer("localhost", servers.head.boundPort(), 1000000, 64*1024, "") } @After override def tearDown() { simpleConsumer.close - server.shutdown - Utils.rm(logDir) super.tearDown() } @@ -80,7 +73,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { // setup brokers in zookeeper as owners of partitions for this test AdminUtils.createTopic(zkClient, topic, 1, 1) - val logManager = server.getLogManager + val logManager = servers.head.getLogManager waitUntilTrue(() => logManager.getLog(TopicAndPartition(topic, part)).isDefined, "Log for partition [topic,0] should be created") val log = logManager.getLog(TopicAndPartition(topic, part)).get @@ -90,10 +83,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) log.flush() - val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10) + val offsets = servers.head.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10) assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), offsets) - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, servers.head), "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) val offsetRequest = OffsetRequest( Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)), @@ -118,7 +111,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val topic = topicPartition.split("-").head // setup brokers in zookeeper as owners of partitions for this test - createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) + createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) var offsetChanged = false for(i <- 1 to 14) { @@ -144,7 +137,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { // setup brokers in zookeeper as owners of partitions for this test AdminUtils.createTopic(zkClient, topic, 3, 1) - val logManager = server.getLogManager + val logManager = servers.head.getLogManager val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig) val message = new Message(Integer.toString(42).getBytes()) for(i <- 0 until 20) @@ -153,10 +146,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs - val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10) + val offsets = servers.head.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10) assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), offsets) - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, servers.head), "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0) val consumerOffsets = @@ -173,18 +166,18 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { // setup brokers in zookeeper as owners of partitions for this test AdminUtils.createTopic(zkClient, topic, 3, 1) - val logManager = server.getLogManager + val logManager = servers.head.getLogManager val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig) val message = new Message(Integer.toString(42).getBytes()) for(i <- 0 until 20) log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) log.flush() - val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.EarliestTime, 10) + val offsets = servers.head.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.EarliestTime, 10) assertEquals(Seq(0L), offsets) - waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") + waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, servers.head), "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10))) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 92e49df..28d6d5e 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -20,7 +20,7 @@ import java.util.Properties import kafka.utils.TestUtils._ import kafka.utils.{IntEncoder, Utils, TestUtils} -import kafka.zk.ZooKeeperTestHarness +import kafka.integration.KafkaServerTestHarness import kafka.common._ import kafka.producer.{KeyedMessage, Producer} import kafka.serializer.StringEncoder @@ -30,7 +30,7 @@ import java.io.File import org.scalatest.junit.JUnit3Suite import org.junit.Assert._ -class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { +class LogRecoveryTest extends JUnit3Suite with KafkaServerTestHarness { val replicaLagTimeMaxMs = 5000L val replicaLagMaxMessages = 10L @@ -42,7 +42,12 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString) - var configs: Seq[KafkaConfig] = null + val numNodes = 2 + def generateConfigs() = + for (props <- TestUtils.createBrokerConfigs(numNodes, zkConnect)) + yield KafkaConfig.fromProps(props, overridingProps) + + val topic = "new-topic" val partitionId = 0 @@ -57,7 +62,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { var producer: Producer[Int, String] = null def hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename)) def hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename)) - var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] // Some tests restart the brokers then produce more data. But since test brokers use random ports, we need // to use a new producer that knows the new ports @@ -72,12 +76,10 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { override def setUp() { super.setUp() - configs = TestUtils.createBrokerConfigs(2, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps)) // start both servers - server1 = TestUtils.createServer(configProps1) - server2 = TestUtils.createServer(configProps2) - servers = List(server1, server2) + server1 = servers.head + server2 = servers.last // create topic with 1 partition, 2 replicas, one on each broker createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) @@ -88,10 +90,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { producer.close() - for(server <- servers) { - server.shutdown() - Utils.rm(server.config.logDirs(0)) - } super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index a67cc37..66ff35a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -18,29 +18,20 @@ package kafka.server import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness +import kafka.integration.KafkaServerTestHarness import kafka.utils.TestUtils._ import kafka.producer.KeyedMessage import kafka.serializer.StringEncoder import kafka.utils.{TestUtils} import kafka.common._ -class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { - var brokers: Seq[KafkaServer] = null +class ReplicaFetchTest extends JUnit3Suite with KafkaServerTestHarness { val topic1 = "foo" val topic2 = "bar" - override def setUp() { - super.setUp() - brokers = createBrokerConfigs(2, zkConnect, false) - .map(KafkaConfig.fromProps) - .map(config => TestUtils.createServer(config)) - } + val numNodes = 2 + def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps) - override def tearDown() { - brokers.foreach(_.shutdown()) - super.tearDown() - } def testReplicaFetcherThread() { val partition = 0 @@ -49,11 +40,11 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { // create a topic and partition and await leadership for (topic <- List(topic1,topic2)) { - createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = brokers) + createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers) } // send test messages to leader - val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(brokers), + val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(servers), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName) val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m)) @@ -64,8 +55,8 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { var result = true for (topic <- List(topic1, topic2)) { val topicAndPart = TopicAndPartition(topic, partition) - val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset - result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total && + val expectedOffset = servers.head.getLogManager().getLog(topicAndPart).get.logEndOffset + result = result && expectedOffset > 0 && servers.foldLeft(true) { (total, item) => total && (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset) } } result diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala index 2bfaeb3..2c5573d 100644 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -18,107 +18,73 @@ package kafka.server import java.util.Properties -import kafka.zk.ZooKeeperTestHarness +import kafka.integration.KafkaServerTestHarness import kafka.utils.{TestUtils, Utils} import org.junit.Test import org.scalatest.junit.JUnit3Suite import junit.framework.Assert._ import java.io.File -class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { - var props1: Properties = null - var config1: KafkaConfig = null - var props2: Properties = null - var config2: KafkaConfig = null - val brokerMetaPropsFile = "meta.properties" +class ServerGenerateBrokerIdTest extends JUnit3Suite with KafkaServerTestHarness { + def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)), + KafkaConfig.fromProps(TestUtils.createBrokerConfig(-1, zkConnect)), + KafkaConfig.fromProps(TestUtils.createBrokerConfig(-1, zkConnect))) - override def setUp() { - super.setUp() - props1 = TestUtils.createBrokerConfig(-1, zkConnect) - config1 = KafkaConfig.fromProps(props1) - props2 = TestUtils.createBrokerConfig(0, zkConnect) - config2 = KafkaConfig.fromProps(props2) - } + val brokerMetaPropsFile = "meta.properties" @Test def testAutoGenerateBrokerId() { - var server1 = new KafkaServer(config1) - server1.startup() - server1.shutdown() - assertTrue(verifyBrokerMetadata(config1.logDirs, 1001)) + servers.head.shutdown() + assertTrue(verifyBrokerMetadata(servers.last.config.logDirs, 1002)) // restart the server check to see if it uses the brokerId generated previously - server1 = new KafkaServer(config1) - server1.startup() - assertEquals(server1.config.brokerId, 1001) - server1.shutdown() - Utils.rm(server1.config.logDirs) - TestUtils.verifyNonDaemonThreadsStatus + servers.head.startup() + assertEquals(servers.last.config.brokerId, 1002) } @Test def testUserConfigAndGeneratedBrokerId() { // start the server with broker.id as part of config - val server1 = new KafkaServer(config1) - val server2 = new KafkaServer(config2) - val props3 = TestUtils.createBrokerConfig(-1, zkConnect) - val config3 = KafkaConfig.fromProps(props3) - val server3 = new KafkaServer(config3) - server1.startup() - assertEquals(server1.config.brokerId,1001) - server2.startup() - assertEquals(server2.config.brokerId,0) - server3.startup() - assertEquals(server3.config.brokerId,1002) - server1.shutdown() - server2.shutdown() - server3.shutdown() - assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001)) - assertTrue(verifyBrokerMetadata(server2.config.logDirs,0)) - assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002)) - Utils.rm(server1.config.logDirs) - Utils.rm(server2.config.logDirs) - Utils.rm(server3.config.logDirs) - TestUtils.verifyNonDaemonThreadsStatus + val expectedBrokerIds = List(0, 1001, 1002) + (servers zip expectedBrokerIds).map { + case (server, expectedBrokerId) => + assertEquals(expectedBrokerId, server.config.brokerId) + assertTrue(verifyBrokerMetadata(server.config.logDirs, expectedBrokerId)) + } } @Test def testMultipleLogDirsMetaProps() { // add multiple logDirs and check if the generate brokerId is stored in all of them - val logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath + + var server = servers.last + assertEquals(server.config.brokerId, 1002) + val logDirs = server.config.logDirs.mkString(",") + "," + TestUtils.tempDir().getAbsolutePath + "," + TestUtils.tempDir().getAbsolutePath - props1.setProperty("log.dir",logDirs) - config1 = KafkaConfig.fromProps(props1) - var server1 = new KafkaServer(config1) - server1.startup() - server1.shutdown() - assertTrue(verifyBrokerMetadata(config1.logDirs, 1001)) - // addition to log.dirs after generation of a broker.id from zk should be copied over - val newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath - props1.setProperty("log.dir",newLogDirs) - config1 = KafkaConfig.fromProps(props1) - server1 = new KafkaServer(config1) - server1.startup() - server1.shutdown() - assertTrue(verifyBrokerMetadata(config1.logDirs, 1001)) - Utils.rm(server1.config.logDirs) - TestUtils.verifyNonDaemonThreadsStatus + val kafkaConfigProps = TestUtils.createBrokerConfig(-1, zkConnect) + kafkaConfigProps.setProperty("log.dir", logDirs) + val kafkaConfig = KafkaConfig.fromProps(kafkaConfigProps) + server.shutdown() + server = new KafkaServer(kafkaConfig) + server.startup() + assertTrue(verifyBrokerMetadata(server.config.logDirs, 1002)) + server.shutdown() + Utils.rm(server.config.logDirs) } @Test def testConsistentBrokerIdFromUserConfigAndMetaProps() { // check if configured brokerId and stored brokerId are equal or throw InconsistentBrokerException - var server1 = new KafkaServer(config1) //auto generate broker Id - server1.startup() - server1.shutdown() - server1 = new KafkaServer(config2) // user specified broker id + var server = servers.last + val kafkaConfigProps = TestUtils.createBrokerConfig(0, zkConnect) + kafkaConfigProps.setProperty("log.dir", server.config.logDirs.mkString(",")) + server.shutdown() + server = new KafkaServer(KafkaConfig.fromProps(kafkaConfigProps)) // user specified broker id try { - server1.startup() + server.startup() } catch { case e: kafka.common.InconsistentBrokerIdException => //success } - server1.shutdown() - Utils.rm(server1.config.logDirs) - TestUtils.verifyNonDaemonThreadsStatus + server.shutdown() + Utils.rm(server.config.logDirs) } def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index f451825..9f043a4 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -132,7 +132,7 @@ object TestUtils extends Logging { */ def createBrokerConfigs(numConfigs: Int, zkConnect: String, - enableControlledShutdown: Boolean = true, + enableControlledShutdown: Boolean = false, enableDeleteTopic: Boolean = false): Seq[Properties] = { (0 until numConfigs).map(node => createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic)) } @@ -145,7 +145,7 @@ object TestUtils extends Logging { * Create a test config for the given node id */ def createBrokerConfig(nodeId: Int, zkConnect: String, - enableControlledShutdown: Boolean = true, + enableControlledShutdown: Boolean = false, enableDeleteTopic: Boolean = false, port: Int = RandomPort): Properties = { val props = new Properties -- 1.9.5 (Apple Git-50.3)