diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 686a0df..bdc72ea 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -57,7 +57,7 @@ object TopicCommand { else if(opts.options.has(opts.describeOpt)) describeTopic(zkClient, opts) } catch { - case e => + case e: Throwable => println("Error while executing topic command " + e.getMessage) println(Utils.stackTrace(e)) } finally { diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index c95c650..919aeb2 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -132,7 +132,7 @@ class RequestSendThread(val controllerId: Int, channel.send(request) isSendSuccessful = true } catch { - case e => // if the send was not successful, reconnect to broker and resend the message + case e: Throwable => // if the send was not successful, reconnect to broker and resend the message error(("Controller %d epoch %d failed to send %s request with correlation id %s to broker %s. " + "Reconnecting to broker.").format(controllerId, controllerContext.epoch, RequestKeys.nameForKey(request.requestId.get), request.correlationId, toBroker.toString()), e) @@ -173,7 +173,7 @@ class RequestSendThread(val controllerId: Int, channel.connect() info("Controller %d connected to %s for sending state change requests".format(controllerId, broker.toString())) } catch { - case e => { + case e: Throwable => { channel.disconnect() error("Controller %d's connection to broker %s was unsuccessful".format(controllerId, broker.toString()), e) } diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index a649461..91f0728 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -302,7 +302,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } else isMessageInAllReplicas = false } catch { - case t => + case t: Throwable => throw new RuntimeException("Error in processing replica %d in partition %s at offset %d." .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) } diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 440aed8..ed98703 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -129,8 +129,8 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(leader2.get, leader2FromZk) // read metadata from a broker and verify the new topic partitions exist - TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2, 5000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1) + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2) val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", 2000,0).topicsMetadata val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1)) @@ -154,8 +154,8 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(leader2.get, leader2FromZk) // read metadata from a broker and verify the new topic partitions exist - TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2, 5000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1) + TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2) val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas", 2000,0).topicsMetadata val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2)) @@ -173,12 +173,12 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.addPartitions(zkClient, topic3, 7) // read metadata from a broker and verify the new topic partitions exist - TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5, 5000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6, 5000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6) val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement", 2000,0).topicsMetadata diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 8991050..3a6c5ff 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -310,8 +310,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // create the topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition, 1000) + TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers) val controllerId = ZkUtils.getController(zkClient) val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index e93305a..2b8e16c 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -25,7 +25,6 @@ import scala.collection._ import org.scalatest.junit.JUnit3Suite import kafka.message._ import kafka.serializer._ -import kafka.admin.AdminUtils import org.I0Itec.zkclient.ZkClient import kafka.utils._ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} @@ -97,8 +96,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -176,8 +175,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) // create a consumer val consumerConfig1 = new ConsumerConfig( @@ -249,8 +248,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++ sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) @@ -275,8 +274,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++ sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -310,13 +309,11 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer) // create topic topic1 with 1 partition on broker 0 - AdminUtils.createTopic(zkClient, topic, 1, 1) + TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) // send some messages to each broker val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 1415773..7125ec9 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -109,7 +109,7 @@ kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow FAILED for(i <- 0 until numMessages) producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes)) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) // update offset in zookeeper for consumer to jump "forward" in time val dirs = new ZKGroupTopicDirs(group, topic) diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 60a466f..6d489ad 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -209,9 +209,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testConsumerEmptyTopic() { val newTopic = "new-topic" - AdminUtils.createTopic(zkClient, newTopic, 1, 1) - TestUtils.waitUntilMetadataIsPropagated(servers, newTopic, 0, 1000) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0) + TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers) + val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build()) assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) } diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 761f759..e82110b 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -65,9 +65,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testBasicTopicMetadata { // create topic val topic = "test" - AdminUtils.createTopic(zkClient, topic, 1, 1) - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0, 1000) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) + TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) + var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", 2000,0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) @@ -84,12 +83,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { // create topic val topic1 = "testGetAllTopicMetadata1" val topic2 = "testGetAllTopicMetadata2" - AdminUtils.createTopic(zkClient, topic1, 1, 1) - AdminUtils.createTopic(zkClient, topic2, 1, 1) - - // wait for leader to be elected for both topics - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0, 1000) - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic2, 0, 1000) + TestUtils.createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) + TestUtils.createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) // issue metadata request with empty list of topics var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata", @@ -120,7 +115,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { // wait for leader to be elected TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0, 1000) + TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0) // retry the metadata for the auto created topic topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index bdc6f01..6c3feac 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -169,7 +169,7 @@ class AsyncProducerTest extends JUnit3Suite { props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val broker1 = new Broker(0, "localhost", 9092) val broker2 = new Broker(1, "localhost", 9093) - broker1 + // form expected partitions metadata val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2)) val partition2Metadata = new PartitionMetadata(1, Some(broker2), List(broker1, broker2)) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 439e33e..c1219a8 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -86,10 +86,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ @Test def testUpdateBrokerPartitionInfo() { val topic = "new-topic" - AdminUtils.createTopic(zkClient, topic, 1, 2) - // wait until the update metadata request for new topic reaches all servers - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 500) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers) val props1 = new util.Properties() props1.put("metadata.broker.list", "localhost:80,localhost:81") @@ -152,9 +149,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val topic = "new-topic" // create topic with 1 partition and await leadership - AdminUtils.createTopic(zkClient, topic, 1, 2) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers) val producer1 = new Producer[String, String](producerConfig1) val producer2 = new Producer[String, String](producerConfig2) @@ -183,7 +178,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ fail("Should have timed out for 3 acks.") } catch { - case se: FailedToSendMessageException => true + case se: FailedToSendMessageException => + // this is expected case e: Throwable => fail("Not expected", e) } finally { @@ -203,13 +199,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val topic = "new-topic" // create topic - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0))) - // waiting for 1 partition is enough - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 2) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 3) + TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)), + servers = servers) val config = new ProducerConfig(props) val producer = new Producer[String, String](config) @@ -266,9 +257,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val topic = "new-topic" // create topics in ZK - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(0->Seq(0,1))) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) // do a simple test to make sure plumbing is okay try { diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 4840824..0dec9ec 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -92,8 +92,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val props = TestUtils.getSyncProducerConfig(server.socketServer.port) val producer = new SyncProducer(new SyncProducerConfig(props)) - AdminUtils.createTopic(zkClient, "test", 1, 1) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0) + TestUtils.createTopic(zkClient, "test", numPartitions = 1, replicationFactor = 1, servers = servers) val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1)) val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1) diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 76ae659..068c626 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -22,7 +22,6 @@ import kafka.utils._ import junit.framework.Assert._ import java.util.{Random, Properties} import kafka.consumer.SimpleConsumer -import org.junit.{After, Before, Test} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite @@ -30,8 +29,6 @@ import kafka.admin.AdminUtils import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} import kafka.utils.TestUtils._ import kafka.common.{ErrorMapping, TopicAndPartition} -import kafka.utils.nonthreadsafe -import kafka.utils.threadsafe import org.junit.After import org.junit.Before import org.junit.Test @@ -123,8 +120,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val topic = topicPartition.split("-").head // setup brokers in zookeeper as owners of partitions for this test - AdminUtils.createTopic(zkClient, topic, 1, 1) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) var offsetChanged = false for(i <- 1 to 14) { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 5305167..3b2814e 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -22,7 +22,6 @@ import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import kafka.producer.KeyedMessage import kafka.serializer.StringEncoder -import kafka.admin.AdminUtils import kafka.utils.TestUtils import junit.framework.Assert._ import kafka.common._ @@ -51,8 +50,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { // create a topic and partition and await leadership for (topic <- List(topic1,topic2)) { - AdminUtils.createTopic(zkClient, topic, 1, 2) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) + TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = brokers) } // send test messages to leader diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 1651822..5357c84 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -26,7 +26,6 @@ import kafka.zk.ZooKeeperTestHarness import kafka.producer._ import kafka.utils.IntEncoder import kafka.utils.TestUtils._ -import kafka.admin.AdminUtils import kafka.api.FetchRequestBuilder import kafka.utils.{TestUtils, Utils} @@ -49,8 +48,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { var producer = new Producer[Int, String](new ProducerConfig(producerConfig)) // create topic - AdminUtils.createTopic(zkClient, topic, 1, 1) - TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000) + TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) // send some messages producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*) @@ -69,7 +67,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { server.startup() // wait for the broker to receive the update metadata request after startup - TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000) + TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0) producer = new Producer[Int, String](new ProducerConfig(producerConfig)) val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 4bd5964..130b6be 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -23,8 +23,6 @@ import java.nio._ import java.nio.channels._ import java.util.Random import java.util.Properties -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.TimeUnit import collection.mutable.Map import collection.mutable.ListBuffer @@ -150,15 +148,33 @@ object TestUtils extends Logging { } /** - * Create a topic in zookeeper + * Create a topic in zookeeper. + * Wait until the leader is elected and the metadata is propagated to all brokers. + * Return the leader for each partition. */ def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, - servers: List[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { + servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { // create topic AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor) // wait until the update metadata request for new topic reaches all servers (0 until numPartitions).map { case i => - TestUtils.waitUntilMetadataIsPropagated(servers, topic, i, 500) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, i) + i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i) + }.toMap + } + + /** + * Create a topic in zookeeper using a customized replica assignment. + * Wait until the leader is elected and the metadata is propagated to all brokers. + * Return the leader for each partition. + */ + def createTopic(zkClient: ZkClient, topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]], + servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = { + // create topic + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaAssignment) + // wait until the update metadata request for new topic reaches all servers + partitionReplicaAssignment.keySet.map { case i => + TestUtils.waitUntilMetadataIsPropagated(servers, topic, i) i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i) }.toMap } @@ -553,8 +569,8 @@ object TestUtils extends Logging { byteBuffer } - def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long) = { - assertTrue("Partition [%s,%d] metadata not propagated after timeout".format(topic, partition), + def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int, timeout: Long = 5000L) = { + assertTrue("Partition [%s,%d] metadata not propagated after %d ms".format(topic, partition, timeout), TestUtils.waitUntilTrue(() => servers.foldLeft(true)(_ && _.apis.metadataCache.containsTopicAndPartition(topic, partition)), timeout)) }