Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/ProducerTest.scala (revision 1178446) +++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala (working copy) @@ -156,7 +156,7 @@ val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) // it should send to random partition on broker 1 - syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes()))) + syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes()))) EasyMock.expectLastCall syncProducer1.close EasyMock.expectLastCall @@ -374,7 +374,7 @@ val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) // it should send to partition 0 (first partition) on second broker i.e broker2 - asyncProducer1.send(topic, "test1", -1) + asyncProducer1.send(topic, "test1", 0) EasyMock.expectLastCall asyncProducer1.close EasyMock.expectLastCall @@ -610,9 +610,9 @@ val serverConfig = new KafkaConfig(serverProps) { override val numPartitions = 4 } + val server3 = TestUtils.createServer(serverConfig) Thread.sleep(500) - // send a message to the new broker to register it under topic "test-topic" val tempProps = new Properties() tempProps.put("host", "localhost") @@ -622,7 +622,6 @@ messages = new Message("test".getBytes()))) Thread.sleep(500) - producer.send(new ProducerData[String, String]("test-topic", "test-topic", Array("test1"))) producer.close @@ -648,7 +647,7 @@ val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) // it should send to partition 0 (first partition) on second broker i.e broker2 - asyncProducer1.send(topic, "test1", -1) + asyncProducer1.send(topic, "test1", 0) EasyMock.expectLastCall asyncProducer1.close EasyMock.expectLastCall Index: core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (revision 1178446) +++ core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (working copy) @@ -172,7 +172,7 @@ // it should send to random partition on broker 1 val messageList = new java.util.ArrayList[Message] messageList.add(new Message("t".getBytes())) - syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) + syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) EasyMock.expectLastCall syncProducer1.close EasyMock.expectLastCall @@ -368,7 +368,7 @@ val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) // it should send to partition 0 (first partition) on second broker i.e broker2 - asyncProducer1.send(topic, "test1", -1) + asyncProducer1.send(topic, "test1", 0) EasyMock.expectLastCall asyncProducer1.close EasyMock.expectLastCall @@ -554,7 +554,6 @@ val messageList = new java.util.ArrayList[Message] messageList.add(new Message("test".getBytes())) tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) - Thread.sleep(500) val messagesContent = new java.util.ArrayList[String] @@ -585,7 +584,7 @@ val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]]) // it should send to partition 0 (first partition) on second broker i.e broker2 - asyncProducer1.send(topic, "test1", -1) + asyncProducer1.send(topic, "test1", 0) EasyMock.expectLastCall asyncProducer1.close EasyMock.expectLastCall Index: core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala =================================================================== --- core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (revision 1178446) +++ core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala (working copy) @@ -13,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.producer import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig} @@ -27,6 +27,7 @@ import collection.SortedSet private[producer] object ZKBrokerPartitionInfo { + /** * Generate a mapping from broker id to (brokerId, numPartitions) for the list of brokers * specified @@ -49,7 +50,7 @@ } } brokerParts - } + } } /** @@ -89,22 +90,24 @@ * @return a sequence of (brokerId, numPartitions). Returns a zero-length * sequence if no brokers are available. */ - def getBrokerPartitionInfo(topic: String): scala.collection.immutable.SortedSet[Partition] = { - val brokerPartitions = topicBrokerPartitions.get(topic) - var numBrokerPartitions = SortedSet.empty[Partition] - brokerPartitions match { - case Some(bp) => - bp.size match { - case 0 => // no brokers currently registered for this topic. Find the list of all brokers in the cluster. - numBrokerPartitions = bootstrapWithExistingBrokers(topic) - topicBrokerPartitions += (topic -> numBrokerPartitions) - case _ => numBrokerPartitions = TreeSet[Partition]() ++ bp - } - case None => // no brokers currently registered for this topic. Find the list of all brokers in the cluster. - numBrokerPartitions = bootstrapWithExistingBrokers(topic) - topicBrokerPartitions += (topic -> numBrokerPartitions) + def getBrokerPartitionInfo(topic: String): SortedSet[Partition] = { + zkWatcherLock synchronized { + val brokerPartitions = topicBrokerPartitions.get(topic) + var numBrokerPartitions = SortedSet.empty[Partition] + brokerPartitions match { + case Some(bp) => + bp.size match { + case 0 => // no brokers currently registered for this topic. Find the list of all brokers in the cluster. + numBrokerPartitions = bootstrapWithExistingBrokers(topic) + topicBrokerPartitions += (topic -> numBrokerPartitions) + case _ => numBrokerPartitions = TreeSet[Partition]() ++ bp + } + case None => // no brokers currently registered for this topic. Find the list of all brokers in the cluster. + numBrokerPartitions = bootstrapWithExistingBrokers(topic) + topicBrokerPartitions += (topic -> numBrokerPartitions) + } + numBrokerPartitions } - numBrokerPartitions } /** @@ -123,18 +126,28 @@ def close = zkClient.close + def updateInfo = { + zkWatcherLock synchronized { + topicBrokerPartitions = getZKTopicPartitionInfo + allBrokers = getZKBrokerInfo + } + } + private def bootstrapWithExistingBrokers(topic: String): scala.collection.immutable.SortedSet[Partition] = { - logger.debug("Currently, no brokers are registered under topic: " + topic) - logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " + + if(logger.isDebugEnabled) logger.debug("Currently, no brokers are registered under topic: " + topic) + if(logger.isDebugEnabled) + logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " + "number of partitions = 1") val allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath) - logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString) + if(logger.isTraceEnabled) + logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString) // since we do not have the in formation about number of partitions on these brokers, just assume single partition // i.e. pick partition 0 from each broker as a candidate val numBrokerPartitions = TreeSet[Partition]() ++ allBrokersIds.map(b => new Partition(b.toInt, 0)) // add the rest of the available brokers with default 1 partition for this topic, so all of the brokers // participate in hosting this topic. - logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString) + if(logger.isDebugEnabled) + logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString) numBrokerPartitions } @@ -154,7 +167,8 @@ val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt) val brokerPartitions = brokerList.map(bid => bid.toInt).zip(numPartitions) val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1) - logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString) + if(logger.isDebugEnabled) + logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString) var brokerParts = SortedSet.empty[Partition] sortedBrokerPartitions.foreach { bp => @@ -164,7 +178,8 @@ } } brokerPartitionsPerTopic += (topic -> brokerParts) - logger.debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString) + if(logger.isDebugEnabled) + logger.debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString) } brokerPartitionsPerTopic } @@ -195,26 +210,35 @@ private var oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ originalBrokerIdMap private val logger = Logger.getLogger(classOf[BrokerTopicsListener]) - logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" + - "/broker/topics, /broker/topics/topic, /broker/ids") - logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " + + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" + + "/broker/topics, /broker/topics/topic, /broker/ids") + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " + "partition id per topic with " + oldBrokerTopicPartitionsMap.toString) @throws(classOf[Exception]) - def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { + def handleChildChange(parentPath : String, currentChildren : java.util.List[String]) { + val curChilds: java.util.List[String] = if(currentChildren != null) currentChildren + else new java.util.ArrayList[String]() + zkWatcherLock synchronized { - logger.trace("Watcher fired for path: " + parentPath) + if(logger.isTraceEnabled) + logger.trace("Watcher fired for path: " + parentPath + " with change " + curChilds.toString) import scala.collection.JavaConversions._ parentPath match { case "/brokers/topics" => // this is a watcher for /broker/topics path val updatedTopics = asBuffer(curChilds) - logger.debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " + - curChilds.toString) - logger.debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString) - logger.debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString) + if(logger.isDebugEnabled) { + logger.debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " + + curChilds.toString) + logger.debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString) + logger.debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString) + } val newTopics = updatedTopics.toSet &~ oldBrokerTopicPartitionsMap.keySet - logger.debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString) newTopics.foreach { topic => val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath) @@ -223,15 +247,17 @@ brokerTopicsListener) } case "/brokers/ids" => // this is a watcher for /broker/ids path - logger.debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath + - "\t Currently registered list of brokers -> " + curChilds.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath + + "\t Currently registered list of brokers -> " + curChilds.toString) processBrokerChange(parentPath, curChilds) case _ => val pathSplits = parentPath.split("/") val topic = pathSplits.last if(pathSplits.length == 4 && pathSplits(2).equals("topics")) { - logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + - " list of brokers -> " + curChilds.toString + " for topic -> " + topic) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + + " list of brokers -> " + curChilds.toString + " for topic -> " + topic) processNewBrokerInExistingTopic(topic, asBuffer(curChilds)) } } @@ -247,17 +273,18 @@ import scala.collection.JavaConversions._ val updatedBrokerList = asBuffer(curChilds).map(bid => bid.toInt) val newBrokers = updatedBrokerList.toSet &~ oldBrokerIdMap.keySet - logger.debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString) + if(logger.isDebugEnabled) logger.debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString) newBrokers.foreach { bid => val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid) val brokerHostPort = brokerInfo.split(":") allBrokers += (bid -> new Broker(bid, brokerHostPort(1), brokerHostPort(1), brokerHostPort(2).toInt)) - logger.debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid) + if(logger.isDebugEnabled) logger.debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid) producerCbk(bid, brokerHostPort(1), brokerHostPort(2).toInt) } // remove dead brokers from the in memory list of live brokers val deadBrokers = oldBrokerIdMap.keySet &~ updatedBrokerList.toSet - logger.debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString) deadBrokers.foreach {bid => allBrokers = allBrokers - bid // also remove this dead broker from particular topics @@ -266,7 +293,8 @@ case Some(oldBrokerPartitionList) => val aliveBrokerPartitionList = oldBrokerPartitionList.filter(bp => bp.brokerId != bid) topicBrokerPartitions += (topic -> aliveBrokerPartitionList) - logger.debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " + + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " + "Updated list of broker id, partition id = " + aliveBrokerPartitionList.toString) case None => } @@ -285,19 +313,23 @@ // find the old list of brokers for this topic oldBrokerTopicPartitionsMap.get(topic) match { case Some(brokersParts) => - logger.debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString) case None => } + val updatedBrokerList = curChilds.map(b => b.toInt) import ZKBrokerPartitionInfo._ val updatedBrokerParts:SortedSet[Partition] = getBrokerPartitions(zkClient, topic, updatedBrokerList.toList) - logger.debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " + - curChilds.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " + + curChilds.toString) // update the number of partitions on existing brokers var mergedBrokerParts: SortedSet[Partition] = TreeSet[Partition]() ++ updatedBrokerParts topicBrokerPartitions.get(topic) match { case Some(oldBrokerParts) => - logger.debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " + + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " + oldBrokerParts.toString) mergedBrokerParts = oldBrokerParts ++ updatedBrokerParts case None => @@ -305,16 +337,24 @@ // keep only brokers that are alive mergedBrokerParts = mergedBrokerParts.filter(bp => allBrokers.contains(bp.brokerId)) topicBrokerPartitions += (topic -> mergedBrokerParts) - logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + mergedBrokerParts.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + + mergedBrokerParts.toString) } def resetState = { - logger.debug("[BrokerTopicsListener] Before reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString) + if(logger.isTraceEnabled) + logger.trace("[BrokerTopicsListener] Before reseting broker topic partitions state " + + oldBrokerTopicPartitionsMap.toString) oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++ topicBrokerPartitions - logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString) - logger.debug("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " + + oldBrokerTopicPartitionsMap.toString) + if(logger.isTraceEnabled) + logger.trace("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString) oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ allBrokers - logger.debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString) } } @@ -322,7 +362,8 @@ * Handles the session expiration event in zookeeper */ class ZKSessionExpirationListener(val brokerTopicsListener: BrokerTopicsListener) - extends IZkStateListener { + extends IZkStateListener { + @throws(classOf[Exception]) def handleStateChanged(state: KeeperState) { // do nothing, since zkclient will do reconnect for us. @@ -350,7 +391,7 @@ // NOTE: this is probably not required here. Since when we read from getZKTopicPartitionInfo() above, // it automatically recreates the watchers there itself topicBrokerPartitions.keySet.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic, - brokerTopicsListener)) + brokerTopicsListener)) // there is no need to re-register other listeners as they are listening on the child changes of // permanent nodes } Index: core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala =================================================================== --- core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (revision 1178446) +++ core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (working copy) @@ -53,13 +53,15 @@ def close {} + def updateInfo = {} + /** * Generate a sequence of (brokerId, numPartitions) for all brokers * specified in the producer configuration * @return sequence of (brokerId, numPartitions) */ private def getConfigTopicPartitionInfo(): SortedSet[Partition] = { - val brokerInfoList = config.brokerPartitionInfo.split(",") + val brokerInfoList = config.brokerList.split(",") if(brokerInfoList.size == 0) throw new InvalidConfigException("broker.list is empty") // check if each individual broker info is valid => (brokerId: brokerHost: brokerPort) brokerInfoList.foreach { bInfo => @@ -84,7 +86,7 @@ */ private def getConfigBrokerInfo(): Map[Int, Broker] = { val brokerInfo = new HashMap[Int, Broker]() - val brokerInfoList = config.brokerPartitionInfo.split(",") + val brokerInfoList = config.brokerList.split(",") brokerInfoList.foreach{ bInfo => val brokerIdHostPort = bInfo.split(":") brokerInfo += (brokerIdHostPort(0).toInt -> new Broker(brokerIdHostPort(0).toInt, brokerIdHostPort(1), Index: core/src/main/scala/kafka/producer/Producer.scala =================================================================== --- core/src/main/scala/kafka/producer/Producer.scala (revision 1178446) +++ core/src/main/scala/kafka/producer/Producer.scala (working copy) @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.api.ProducerRequest import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException} +import collection.SortedSet class Producer[K,V](config: ProducerConfig, partitioner: Partitioner[K], @@ -35,9 +36,9 @@ { private val logger = Logger.getLogger(classOf[Producer[K, V]]) private val hasShutdown = new AtomicBoolean(false) - if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerPartitionInfo)) + if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList)) throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified") - if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerPartitionInfo)) + if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList)) logger.warn("Both zk.connect and broker.list provided (zk.connect takes precedence).") private val random = new java.util.Random // check if zookeeper based auto partition discovery is enabled @@ -94,47 +95,86 @@ partitioner: Partitioner[K]) = this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner, new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null) + /** * Sends the data, partitioned by key to the topic using either the * synchronous or the asynchronous producer * @param producerData the producer data object that encapsulates the topic, key and message data */ def send(producerData: ProducerData[K,V]*) { + zkEnabled match { + case true => zkSend(producerData: _*) + case false => configSend(producerData: _*) + } + } + + private def zkSend(producerData: ProducerData[K,V]*) { val producerPoolRequests = producerData.map { pd => - // find the number of broker partitions registered for this topic - logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic) - val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq - logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions) - val totalNumPartitions = numBrokerPartitions.length - if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey) + var brokerIdPartition: Option[Partition] = None + var brokerInfoOpt: Option[Broker] = None - var brokerIdPartition: Partition = null - var partition: Int = 0 - if(zkEnabled) { - // get the partition id + var numRetries: Int = 0 + while(numRetries <= config.zkReadRetries && brokerInfoOpt.isEmpty) { + if(numRetries > 0) { + logger.info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again") + brokerPartitionInfo.updateInfo + } + + val numBrokerPartitions = getNumPartitionsForTopic(pd) + val totalNumPartitions = numBrokerPartitions.length + val partitionId = getPartition(pd.getKey, totalNumPartitions) - brokerIdPartition = numBrokerPartitions(partitionId) - val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get - logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + - " on partition " + brokerIdPartition.partId) - partition = brokerIdPartition.partId - }else { - // randomly select a broker - val randomBrokerId = random.nextInt(totalNumPartitions) - brokerIdPartition = numBrokerPartitions(randomBrokerId) - val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get + brokerIdPartition = Some(numBrokerPartitions(partitionId)) + brokerInfoOpt = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.get.brokerId) + numRetries += 1 + } - logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + - " on a randomly chosen partition") - partition = ProducerRequest.RandomPartition + brokerInfoOpt match { + case Some(brokerInfo) => + if(logger.isDebugEnabled) logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + + " on partition " + brokerIdPartition.get.partId) + case None => + throw new NoBrokersForPartitionException("Invalid Zookeeper state. Failed to get partition for topic: " + + pd.getTopic + " and key: " + pd.getKey) } producerPool.getProducerPoolData(pd.getTopic, - new Partition(brokerIdPartition.brokerId, partition), - pd.getData) + new Partition(brokerIdPartition.get.brokerId, brokerIdPartition.get.partId), + pd.getData) } producerPool.send(producerPoolRequests: _*) } + private def configSend(producerData: ProducerData[K,V]*) { + val producerPoolRequests = producerData.map { pd => + // find the broker partitions registered for this topic + val numBrokerPartitions = getNumPartitionsForTopic(pd) + val totalNumPartitions = numBrokerPartitions.length + + val partitionId = getPartition(pd.getKey, totalNumPartitions) + val brokerIdPartition = numBrokerPartitions(partitionId) + val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get + + if(logger.isDebugEnabled) + logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " + + brokerIdPartition.partId) + producerPool.getProducerPoolData(pd.getTopic, + new Partition(brokerIdPartition.brokerId, brokerIdPartition.partId), + pd.getData) + } + producerPool.send(producerPoolRequests: _*) + } + + private def getNumPartitionsForTopic(pd: ProducerData[K,V]): Seq[Partition] = { + if(logger.isDebugEnabled) + logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic) + val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq + if(logger.isDebugEnabled) + logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions) + val totalNumPartitions = numBrokerPartitions.length + if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey) + numBrokerPartitions + } + /** * Retrieves the partition id and throws an InvalidPartitionException if * the value of partition is not between 0 and numPartitions-1 Index: core/src/main/scala/kafka/producer/ProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerConfig.scala (revision 1178446) +++ core/src/main/scala/kafka/producer/ProducerConfig.scala (working copy) @@ -28,8 +28,8 @@ /** For bypassing zookeeper based auto partition discovery, use this config * * to pass in static broker and per-broker partition information. Format- * * brokerid1:host1:port1, brokerid2:host2:port2*/ - val brokerPartitionInfo = Utils.getString(props, "broker.list", null) - if(brokerPartitionInfo != null && Utils.getString(props, "partitioner.class", null) != null) + val brokerList = Utils.getString(props, "broker.list", null) + if(brokerList != null && Utils.getString(props, "partitioner.class", null) != null) throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set") /** the partitioner class for partitioning events amongst sub-topics */ @@ -58,4 +58,14 @@ * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null)) + + /** + * The producer using the zookeeper software load balancer maintains a ZK cache that gets + * updated by the zookeeper watcher listeners. During some events like a broker bounce, the + * producer ZK cache can get into an inconsistent state, for a small time period. In this time + * period, it could end up picking a broker partition that is unavailable. When this happens, the + * ZK cache needs to be updated. + * This parameter specifies the number of times the producer attempts to refresh this ZK cache. + */ + val zkReadRetries = Utils.getInt(props, "zk.read.num.retries", 3) } Index: core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala =================================================================== --- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (revision 1178446) +++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (working copy) @@ -44,6 +44,15 @@ def getAllBrokerInfo: Map[Int, Broker] /** + * This is relevant to the ZKBrokerPartitionInfo. It updates the ZK cache + * by reading from zookeeper and recreating the data structures. This API + * is invoked by the producer, when it detects that the ZK cache of + * ZKBrokerPartitionInfo is stale. + * + */ + def updateInfo + + /** * Cleanup */ def close