Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/ProducerTest.scala (revision 1186489) +++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala (working copy) @@ -155,8 +155,8 @@ // 2 sync producers val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - // it should send to partition 0 due to the StaticPartitioner - syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes()))) + // it should send to a random partition due to use of broker.list + syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes()))) EasyMock.expectLastCall syncProducer1.close EasyMock.expectLastCall @@ -373,8 +373,8 @@ // 2 async producers val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) - // it should send to partition 0 (first partition) on second broker i.e broker2 - asyncProducer1.send(topic, "test1", 0) + // it should send to a random partition due to use of broker.list + asyncProducer1.send(topic, "test1", -1) EasyMock.expectLastCall asyncProducer1.close EasyMock.expectLastCall @@ -646,8 +646,8 @@ // 2 async producers val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) - // it should send to partition 0 (first partition) on second broker i.e broker2 - asyncProducer1.send(topic, "test1", 0) + // it should send to a random partition due to use of broker.list + asyncProducer1.send(topic, "test1", -1) EasyMock.expectLastCall asyncProducer1.close EasyMock.expectLastCall Index: core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (revision 1186489) +++ core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (working copy) @@ -169,10 +169,10 @@ // 2 sync producers val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - // it should send to partition 0 due to the StaticPartitioner + // it should send to a random partition due to use of broker.list val messageList = new java.util.ArrayList[Message] messageList.add(new Message("t".getBytes())) - syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) + syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) EasyMock.expectLastCall syncProducer1.close EasyMock.expectLastCall @@ -367,8 +367,8 @@ // 2 async producers val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) - // it should send to partition 0 (first partition) on second broker i.e broker2 - asyncProducer1.send(topic, "test1", 0) + // it should send to a random partition due to use of broker.list + asyncProducer1.send(topic, "test1", -1) EasyMock.expectLastCall asyncProducer1.close EasyMock.expectLastCall @@ -583,8 +583,8 @@ val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]]) - // it should send to partition 0 (first partition) on second broker i.e broker2 - asyncProducer1.send(topic, "test1", 0) + // it should send to a random partition due to use of broker.list + asyncProducer1.send(topic, "test1", -1) EasyMock.expectLastCall asyncProducer1.close EasyMock.expectLastCall Index: core/src/main/scala/kafka/producer/Producer.scala =================================================================== --- core/src/main/scala/kafka/producer/Producer.scala (revision 1186489) +++ core/src/main/scala/kafka/producer/Producer.scala (working copy) @@ -24,6 +24,7 @@ import kafka.cluster.{Partition, Broker} import java.util.concurrent.atomic.AtomicBoolean import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException} +import kafka.api.ProducerRequest class Producer[K,V](config: ProducerConfig, partitioner: Partitioner[K], @@ -118,11 +119,11 @@ brokerPartitionInfo.updateInfo } - val numBrokerPartitions = getNumPartitionsForTopic(pd) - val totalNumPartitions = numBrokerPartitions.length + val topicPartitionsList = getPartitionListForTopic(pd) + val totalNumPartitions = topicPartitionsList.length val partitionId = getPartition(pd.getKey, totalNumPartitions) - brokerIdPartition = Some(numBrokerPartitions(partitionId)) + brokerIdPartition = Some(topicPartitionsList(partitionId)) brokerInfoOpt = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.get.brokerId) numRetries += 1 } @@ -145,32 +146,36 @@ private def configSend(producerData: ProducerData[K,V]*) { val producerPoolRequests = producerData.map { pd => // find the broker partitions registered for this topic - val numBrokerPartitions = getNumPartitionsForTopic(pd) - val totalNumPartitions = numBrokerPartitions.length + val topicPartitionsList = getPartitionListForTopic(pd) + val totalNumPartitions = topicPartitionsList.length - val partitionId = getPartition(pd.getKey, totalNumPartitions) - val brokerIdPartition = numBrokerPartitions(partitionId) + val randomBrokerId = random.nextInt(totalNumPartitions) + val brokerIdPartition = topicPartitionsList(randomBrokerId) val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get if(logger.isDebugEnabled) + logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + + " on a randomly chosen partition") + val partition = ProducerRequest.RandomPartition + if(logger.isDebugEnabled) logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " + brokerIdPartition.partId) producerPool.getProducerPoolData(pd.getTopic, - new Partition(brokerIdPartition.brokerId, brokerIdPartition.partId), + new Partition(brokerIdPartition.brokerId, partition), pd.getData) } producerPool.send(producerPoolRequests: _*) } - private def getNumPartitionsForTopic(pd: ProducerData[K,V]): Seq[Partition] = { + private def getPartitionListForTopic(pd: ProducerData[K,V]): Seq[Partition] = { if(logger.isDebugEnabled) logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic) - val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq + val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq if(logger.isDebugEnabled) - logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions) - val totalNumPartitions = numBrokerPartitions.length + logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList) + val totalNumPartitions = topicPartitionsList.length if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey) - numBrokerPartitions + topicPartitionsList } /**