Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/ProducerTest.scala (revision 1367372) +++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala (working copy) @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -645,7 +646,64 @@ Utils.rm(server3.config.logDir) } + @Test + def testPartitionedSendToNewBrokerWithNoExistingTopics() { + val props = new Properties() + props.put("partitioner.class", "kafka.producer.StaticPartitioner") + props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("zk.connect", TestZKUtils.zookeeperConnect) + + val config = new ProducerConfig(props) + val partitioner = new StaticPartitioner + val serializer = new StringSerializer + + // 2 sync producers + val syncProducers = new ConcurrentHashMap[Int, SyncProducer]() + val syncProducer1 = EasyMock.createMock("producer1", classOf[SyncProducer]) + val syncProducer2 = EasyMock.createMock("producer2", classOf[SyncProducer]) + val syncProducer3 = EasyMock.createMock("producer3", classOf[SyncProducer]) + syncProducer3.send("test-topic", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, + messages = new Message("test1".getBytes))) + EasyMock.expectLastCall + syncProducer1.close + EasyMock.expectLastCall + syncProducer2.close + EasyMock.expectLastCall + syncProducer3.close + EasyMock.expectLastCall + EasyMock.replay(syncProducer1) + EasyMock.replay(syncProducer2) + EasyMock.replay(syncProducer3) + + syncProducers.put(brokerId1, syncProducer1) + syncProducers.put(brokerId2, syncProducer2) + syncProducers.put(2, syncProducer3) + + val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) + val producer = new Producer[String, String](config, partitioner, producerPool, false, null) + + val port = TestUtils.choosePort + val serverProps = TestUtils.createBrokerConfig(2, port) + val serverConfig = new KafkaConfig(serverProps) { + override val numPartitions = 4 + } + + val server3 = TestUtils.createServer(serverConfig) + Thread.sleep(500) + + producer.send(new ProducerData[String, String]("test-topic", "test-top", Array("test1"))) + producer.close + + EasyMock.verify(syncProducer1) + EasyMock.verify(syncProducer2) + EasyMock.verify(syncProducer3) + + server3.shutdown + Utils.rm(server3.config.logDir) + } + + @Test def testDefaultPartitioner() { val props = new Properties() props.put("serializer.class", "kafka.producer.StringSerializer") Index: core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala =================================================================== --- core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (revision 1367372) +++ core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (working copy) @@ -137,17 +137,23 @@ } private def bootstrapWithExistingBrokers(topic: String): scala.collection.immutable.SortedSet[Partition] = { - debug("Currently, no brokers are registered under topic: " + topic) + debug("Currently, no brokers are registered under topic: " + topic) debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " + "number of partitions = 1") val allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath) trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString) - // since we do not have the in formation about number of partitions on these brokers, just assume single partition + + bootstrapWithBrokers(topic, allBrokersIds.map(_.toInt)) + } + + private def bootstrapWithBrokers(topic: String, brokerIds: Iterable[Int]): scala.collection.immutable.SortedSet[Partition] = { + // since we do not have the information about number of partitions on these brokers, just assume single partition // i.e. pick partition 0 from each broker as a candidate - val numBrokerPartitions = TreeSet[Partition]() ++ allBrokersIds.map(b => new Partition(b.toInt, 0)) + val numBrokerPartitions = TreeSet[Partition]() ++ brokerIds.map(new Partition(_, 0)) // add the rest of the available brokers with default 1 partition for this topic, so all of the brokers // participate in hosting this topic. debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString) + numBrokerPartitions } @@ -270,6 +276,17 @@ debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid) producerCbk(bid, brokerHostPort(1), brokerHostPort(2).toInt) } + + if(!newBrokers.isEmpty) { + topicBrokerPartitions ++= topicBrokerPartitions.keySet.map { topic => + val numBrokerPartitions = bootstrapWithBrokers(topic, newBrokers) + val oldNumBrokerPartitions = topicBrokerPartitions.get(topic) + var allPartitions = oldNumBrokerPartitions.map(numBrokerPartitions ++ _).orElse(Some(numBrokerPartitions)).get + (topic -> allPartitions) + } + } + + // remove dead brokers from the in memory list of live brokers val deadBrokers = oldBrokerIdMap.keySet &~ updatedBrokerList.toSet debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString)