diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 88af911..68dcfe7 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -42,12 +42,6 @@ case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, init def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages) - def getTranslatedPartition(topic: String, randomSelector: String => Int): Int = { - if (partition == ProducerRequest.RandomPartition) - return randomSelector(topic) - else - return partition - } } object TopicData { diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 32a83c6..14f7611 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -23,7 +23,6 @@ import kafka.network._ import kafka.utils._ object ProducerRequest { - val RandomPartition = -1 val CurrentVersion: Short = 0 def readFrom(buffer: ByteBuffer): ProducerRequest = { @@ -84,7 +83,7 @@ case class ProducerRequest( versionId: Short, } } - def sizeInBytes(): Int = { + def sizeInBytes: Int = { var size = 0 //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4; @@ -105,12 +104,11 @@ case class ProducerRequest( versionId: Short, clientId == that.clientId && requiredAcks == that.requiredAcks && ackTimeout == that.ackTimeout && - data.toSeq == that.data.toSeq) + data.toSeq == that.data.toSeq ) case _ => false } } def getNumTopicPartitions = data.foldLeft(0)(_ + _.partitionData.length) - def expectResponse = requiredAcks > 0 } \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 491b0d7..e2da200 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -34,6 +34,8 @@ object ErrorMapping { val WrongPartitionCode = 3 val InvalidFetchSizeCode = 4 val InvalidFetchRequestFormatCode = 5 + val NoLeaderForPartitionCode = 6 + val NotLeaderForPartitionCode = 7 private val exceptionToCode = Map[Class[Throwable], Int]( @@ -41,7 +43,9 @@ object ErrorMapping { classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode, classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> WrongPartitionCode, classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode, - classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode + classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode, + classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode, + classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode ).withDefaultValue(UnknownCode) /* invert the mapping */ diff --git a/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala b/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala new file mode 100644 index 0000000..dcd7e42 --- /dev/null +++ b/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Indicates that the broker that received the request is not the leader + * for the partition, and hence, cannot handle the request + * + */ +class NotLeaderForPartitionException(message: String) extends RuntimeException(message) { + def this() = this(null) +} diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 633c884..42528e5 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -125,9 +125,10 @@ private[kafka] class LogManager(val config: KafkaConfig, if (topic.length <= 0) throw new InvalidTopicException("topic name can't be empty") if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) { - warn("Wrong partition " + partition + " valid partitions (0," + - (topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")") - throw new InvalidPartitionException("wrong partition " + partition) + val error = "Wrong partition %d, valid partitions (0, %d)." + .format(partition, (topicPartitionsMap.getOrElse(topic, numPartitions) - 1)) + warn(error) + throw new InvalidPartitionException(error) } logs.get(topic) } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 299f2a8..419dcee 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -19,13 +19,13 @@ package kafka.producer.async import kafka.api.{ProducerRequest, TopicData, PartitionData} import kafka.cluster.Partition +import kafka.common._ import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet} import kafka.producer._ import kafka.serializer.Encoder +import kafka.utils.{Utils, Logging} import scala.collection.Map import scala.collection.mutable.{ListBuffer, HashMap} -import kafka.utils.{Utils, Logging} -import kafka.common.{FailedToSendMessageException, NoLeaderForPartitionException, InvalidPartitionException, NoBrokersForPartitionException} class DefaultEventHandler[K,V](config: ProducerConfig, // this api is for testing private val partitioner: Partitioner[K], // use the other constructor @@ -71,8 +71,13 @@ class DefaultEventHandler[K,V](config: ProducerConfig, .format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap) - if((brokerid < 0) || (!send(brokerid, messageSetPerBroker))) - failedProduceRequests.appendAll(eventsPerBrokerMap.map(r => r._2).flatten) + val failedTopicPartitions = send(brokerid, messageSetPerBroker) + for( (topic, partition) <- failedTopicPartitions ) { + eventsPerBrokerMap.get(topic, partition) match { + case Some(data) => failedProduceRequests.appendAll(data) + case None => // nothing + } + } } } catch { case t: Throwable => error("Failed to send messages") @@ -156,31 +161,39 @@ class DefaultEventHandler[K,V](config: ProducerConfig, * * @param brokerId the broker that will receive the request * @param messagesPerTopic the messages as a map from (topic, partition) -> messages + * @return the set (topic, partitions) messages which incurred an error sending or processing */ - private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Boolean = { - try { - if(brokerId < 0) - throw new NoLeaderForPartitionException("No leader for some partition(s) on broker %d".format(brokerId)) - if(messagesPerTopic.size > 0) { - val topics = new HashMap[String, ListBuffer[PartitionData]]() - for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) { - topics.get(topicName) match { - case Some(x) => trace("found " + topicName) - case None => topics += topicName -> new ListBuffer[PartitionData]() //create a new listbuffer for this topic - } - topics(topicName).append(new PartitionData(partitionId, messagesSet)) - } - val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)) - val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeout, topicData.toArray) + private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Seq[(String, Int)] = { + if(brokerId < 0) { + messagesPerTopic.keys.toSeq + } else if(messagesPerTopic.size > 0) { + val topics = new HashMap[String, ListBuffer[PartitionData]]() + for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) { + val partitionData = topics.getOrElseUpdate(topicName, new ListBuffer[PartitionData]()) + partitionData.append(new PartitionData(partitionId, messagesSet)) + } + val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray + val producerRequest = + new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeout, topicData) + try { val syncProducer = producerPool.getProducer(brokerId) val response = syncProducer.send(producerRequest) - // TODO: possibly send response to response callback handler - trace("kafka producer sent messages for topics %s to broker %d on %s:%d" + trace("producer sent messages for topics %s to broker %d on %s:%d" .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port)) + + var msgIdx = -1 + val errors = new ListBuffer[(String, Int)] + for( topic <- topicData; partition <- topic.partitionData ) { + msgIdx += 1 + if(msgIdx > response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError) + errors.append((topic.topic, partition.partition)) + } + errors + } catch { + case e => messagesPerTopic.keys.toSeq } - true - }catch { - case t: Throwable => false + } else { + List.empty } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 82ca5fc..257fd8b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -67,18 +67,17 @@ class KafkaApis(val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper) for( topicData <- request.data ) { for( partitionData <- topicData.partitionData ) { msgIndex += 1 - val partition = partitionData.getTranslatedPartition(topicData.topic, logManager.chooseRandomPartition) try { // TODO: need to handle ack's here! Will probably move to another method. - kafkaZookeeper.ensurePartitionOnThisBroker(topicData.topic, partition) - val log = logManager.getOrCreateLog(topicData.topic, partition) + kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition) + val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition) log.append(partitionData.messages) offsets(msgIndex) = log.nextAppendOffset errors(msgIndex) = ErrorMapping.NoError.toShort trace(partitionData.messages.sizeInBytes + " bytes written to logs.") } catch { case e => - error("Error processing ProducerRequest on " + topicData.topic + ":" + partition, e) + error("Error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e) e match { case _: IOException => fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index 43898c6..526e549 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -17,11 +17,11 @@ package kafka.server -import kafka.utils._ -import org.apache.zookeeper.Watcher.Event.KeeperState import java.net.InetAddress -import kafka.common.{InvalidPartitionException, KafkaZookeeperClient} import kafka.cluster.Replica +import kafka.common.{NoLeaderForPartitionException, NotLeaderForPartitionException, InvalidPartitionException, KafkaZookeeperClient} +import kafka.utils._ +import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient} /** @@ -98,10 +98,15 @@ class KafkaZooKeeper(config: KafkaConfig, } } - def ensurePartitionOnThisBroker(topic: String, partition: Int) { - if(!ZkUtils.isPartitionOnBroker(zkClient, topic, partition, config.brokerId)) - throw new InvalidPartitionException("Broker %d does not host partition %d for topic %s". - format(config.brokerId, partition, topic)) + def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) { + ZkUtils.getLeaderForPartition(zkClient, topic, partition) match { + case Some(leader) => + if(leader != config.brokerId) + throw new NotLeaderForPartitionException("Broker %d is not leader for partition %d for topic %s" + .format(config.brokerId, partition, topic)) + case None => + throw new NoLeaderForPartitionException("There is no leader for topic %s partition %d".format(topic, partition)) + } } def getZookeeperClient = zkClient diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 20ee9ba..1c5aec0 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -23,7 +23,7 @@ import collection.mutable class ReplicaManager(config: KafkaConfig) extends Logging { - private var replicas: mutable.Map[(String, Int), Replica] = new mutable.HashMap[(String, Int), Replica]() + private val replicas = new mutable.HashMap[(String, Int), Replica]() def addLocalReplica(topic: String, partitionId: Int, log: Log): Replica = { val replica = replicas.get((topic, partitionId)) @@ -37,7 +37,7 @@ class ReplicaManager(config: KafkaConfig) extends Logging { case None => val partition = new Partition(topic, partitionId) val replica = new Replica(config.brokerId, partition, topic, Some(log), log.getHighwaterMark, log.maxSize, true) - replicas += (topic, partitionId) -> replica + replicas.put((topic, partitionId), replica) info("Added local replica for topic %s partition %s on broker %d" .format(replica.topic, replica.partition.partId, replica.brokerId)) } @@ -51,7 +51,7 @@ class ReplicaManager(config: KafkaConfig) extends Logging { case None => val partition = new Partition(topic, partitionId) val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false) - replicas += (topic, partitionId) -> replica + replicas.put((topic, partitionId), replica) info("Added remote replica for topic %s partition %s on broker %d" .format(replica.topic, replica.partition.partId, replica.brokerId)) } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 50b35d8..52ac26e 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -83,12 +83,6 @@ object ZkUtils extends Logging { } } - def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { - val replicas = getReplicasForPartition(zkClient, topic, partition) - debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas)) - replicas.contains(brokerId.toString) - } - def tryToBecomeLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { try { createEphemeralPathExpectConflict(zkClient, getTopicPartitionLeaderPath(topic, partition.toString), brokerId.toString) diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index d1f0fe4..d949162 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -87,6 +87,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar zkConsumerConnector0.shutdown + waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500) + waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500) + // send some messages to each broker val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) @@ -98,9 +101,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1)) - waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500) - waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500) - val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) assertEquals(sentMessages1.size, receivedMessages1.size) assertEquals(sentMessages1, receivedMessages1) diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index d0b57c0..4922ed8 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -17,19 +17,21 @@ package kafka.integration +import java.io.File import java.nio.ByteBuffer +import java.util.Properties import junit.framework.Assert._ +import kafka.admin.CreateTopicCommand import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder} import kafka.common.{FetchRequestFormatException, OffsetOutOfRangeException, InvalidPartitionException} -import kafka.server.{KafkaRequestHandler, KafkaConfig} -import org.apache.log4j.{Level, Logger} -import org.scalatest.junit.JUnit3Suite -import java.util.Properties +import kafka.message.Message import kafka.producer.{ProducerData, Producer, ProducerConfig} import kafka.serializer.StringDecoder -import kafka.message.Message -import java.io.File +import kafka.server.{KafkaRequestHandler, KafkaConfig} import kafka.utils.{TestZKUtils, TestUtils} +import org.apache.log4j.{Level, Logger} +import org.I0Itec.zkclient.ZkClient +import org.scalatest.junit.JUnit3Suite import scala.collection._ /** @@ -143,6 +145,9 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness { } def testProduceAndMultiFetch() { + val zkClient = zookeeper.client + createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId) + // send some messages val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { @@ -208,6 +213,9 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness { } def testProduceAndMultiFetchWithCompression() { + val zkClient = zookeeper.client + createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId) + // send some messages val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { @@ -273,6 +281,9 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness { } def testMultiProduce() { + val zkClient = zookeeper.client + createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId) + // send some messages val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, Seq[Message]] @@ -329,4 +340,15 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness { val logFile = new File(config.logDir, newTopic + "-0") assertTrue(!logFile.exists) } + + /** + * For testing purposes, just create these topics each with one partition and one replica for + * which the provided broker should the leader for. Create and wait for broker to lead. Simple. + */ + def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) { + for( topic <- topics ) { + CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString) + TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500) + } + } } diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index e35b1bf..4758836 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -24,13 +24,13 @@ import org.easymock.EasyMock import org.junit.Test import kafka.api._ import kafka.cluster.Broker -import kafka.common.{InvalidConfigException, NoBrokersForPartitionException, InvalidPartitionException} +import kafka.common.{ErrorMapping, InvalidConfigException, NoBrokersForPartitionException, InvalidPartitionException} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.producer.async._ import kafka.serializer.{StringEncoder, StringDecoder, Encoder} import kafka.server.KafkaConfig import kafka.utils.TestUtils._ -import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils} +import kafka.utils.{FixedValuePartitioner, NegativePartitioner, TestZKUtils, TestUtils} import kafka.zk.ZooKeeperTestHarness import collection.Map import collection.mutable.ListBuffer @@ -236,7 +236,7 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { props.put("zk.connect", zkConnect) val config = new ProducerConfig(props) // form expected partitions metadata - val topic1Metadata = getTopicMetadata("topic1", 0, "localhost", 9092) + val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092) val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata)) val producerPool = getMockProducerPool(config, syncProducer) @@ -260,7 +260,7 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { val config = new ProducerConfig(props) // form expected partitions metadata - val topic1Metadata = getTopicMetadata("topic1", 0, "localhost", 9092) + val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092) val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata)) @@ -333,8 +333,8 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { val config = new ProducerConfig(props) // create topic metadata with 0 partitions - val topic1Metadata = getTopicMetadata("topic1", 0, "localhost", 9092) - val topic2Metadata = getTopicMetadata("topic2", 0, "localhost", 9092) + val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092) + val topic2Metadata = getTopicMetadata("topic2", 0, 0, "localhost", 9092) val syncProducer = getSyncProducer(List("topic1", "topic2"), List(topic1Metadata, topic2Metadata)) @@ -375,17 +375,17 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { val config = new ProducerConfig(props) val topic = "topic1" - val topic1Metadata = getTopicMetadata(topic, 0, "localhost", 9092) + val topic1Metadata = getTopicMetadata(topic, 0, 0, "localhost", 9092) val msgs = TestUtils.getMsgStrings(10) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) mockSyncProducer.send(new TopicMetadataRequest(List(topic))) EasyMock.expectLastCall().andReturn(List(topic1Metadata)) mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.take(5)))) - EasyMock.expectLastCall().andReturn(null) + EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L))) mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.takeRight(5)))) - EasyMock.expectLastCall().andReturn(null) - EasyMock.replay(mockSyncProducer) + EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L))) + EasyMock.replay(mockSyncProducer) val producerPool = EasyMock.createMock(classOf[ProducerPool]) producerPool.getZkClient @@ -420,6 +420,70 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { } @Test + def testFailedSendRetryLogic() { + val props = new Properties() + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("zk.connect", TestZKUtils.zookeeperConnect) + + val config = new ProducerConfig(props) + + val topic1 = "topic1" + val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092) + val msgs = TestUtils.getMsgStrings(2) + + // producer used to return topic metadata + val metadataSyncProducer = EasyMock.createMock(classOf[SyncProducer]) + metadataSyncProducer.send(new TopicMetadataRequest(List(topic1))) + EasyMock.expectLastCall().andReturn(List(topic1Metadata)).times(3) + EasyMock.replay(metadataSyncProducer) + + // produce request for topic1 and partitions 0 and 1. Let the first request fail + // entirely. The second request will succeed for partition 1 but fail for partition 0. + // On the third try for partition 0, let it succeed. + val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0) + val response1 = + new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L)) + val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs)) + val response2 = new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L)) + val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) + EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException + EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1) + EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response2) + EasyMock.replay(mockSyncProducer) + + val producerPool = EasyMock.createMock(classOf[ProducerPool]) + EasyMock.expect(producerPool.getZkClient).andReturn(zkClient) + EasyMock.expect(producerPool.addProducers(config)) + EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer) + EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer) + EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer) + EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer) + EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer) + EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer) + EasyMock.expect(producerPool.close()) + EasyMock.replay(producerPool) + + val handler = new DefaultEventHandler[Int,String](config, + partitioner = new FixedValuePartitioner(), + encoder = new StringEncoder, + producerPool = producerPool) + try { + val data = List( + new ProducerData[Int,String](topic1, 0, msgs), + new ProducerData[Int,String](topic1, 1, msgs) + ) + handler.handle(data) + handler.close() + } catch { + case e: Exception => fail("Not expected", e) + } + + EasyMock.verify(metadataSyncProducer) + EasyMock.verify(mockSyncProducer) + EasyMock.verify(producerPool) + } + + @Test def testJavaProducer() { val topic = "topic1" val msgs = TestUtils.getMsgStrings(5) @@ -488,10 +552,13 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { producerPool } - private def getTopicMetadata(topic: String, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { + private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { + getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort) + } + + private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort) - val partition1Metadata = new PartitionMetadata(brokerId, Some(broker1), List(broker1)) - new TopicMetadata(topic, List(partition1Metadata)) + new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1)))) } class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config) { diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index fca0ede..a535fd4 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -92,7 +92,6 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testProduceCorrectlyReceivesResponse() { - // TODO: this will need to change with kafka-44 val server = servers.head val props = new Properties() props.put("host", "localhost") @@ -106,21 +105,25 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) // #1 - test that we get an error when partition does not belong to broker in response - val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages) + val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1) val response = producer.send(request) + Assert.assertNotNull(response) Assert.assertEquals(request.correlationId, response.correlationId) Assert.assertEquals(response.errors.length, response.offsets.length) Assert.assertEquals(3, response.errors.length) - response.errors.foreach(Assert.assertEquals(ErrorMapping.WrongPartitionCode.toShort, _)) + response.errors.foreach(Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, _)) response.offsets.foreach(Assert.assertEquals(-1L, _)) - // #2 - test that we get correct offsets when partition is owner by broker + // #2 - test that we get correct offsets when partition is owned by broker val zkClient = zookeeper.client CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1) + TestUtils.waitUntilLeaderIsElected(zkClient, "topic1", 0, 500) CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1) + TestUtils.waitUntilLeaderIsElected(zkClient, "topic3", 0, 500) val response2 = producer.send(request) + Assert.assertNotNull(response2) Assert.assertEquals(request.correlationId, response2.correlationId) Assert.assertEquals(response2.errors.length, response2.offsets.length) Assert.assertEquals(3, response2.errors.length) @@ -132,7 +135,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertEquals(messages.sizeInBytes, response2.offsets(2)) // the middle message should have been rejected because broker doesn't lead partition - Assert.assertEquals(ErrorMapping.WrongPartitionCode.toShort, response2.errors(1)) + Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, response2.errors(1)) Assert.assertEquals(-1, response2.offsets(1)) } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 5128cc9..adf1dd5 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -343,29 +343,20 @@ object TestUtils extends Logging { /** * Create a wired format request based on simple basic information */ - def produceRequest(topic: String, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { - produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,ProducerRequest.RandomPartition,message) - } def produceRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message) } - def produceRequestWithAcks(topics: Seq[String], partitions: Seq[Int], message: ByteBufferMessageSet): kafka.api.ProducerRequest = { - val correlationId = SyncProducerConfig.DefaultCorrelationId - val clientId = SyncProducerConfig.DefaultClientId - val requiredAcks: Short = 1.toShort - val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs - val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray)) - new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data.toArray) + def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { + produceRequestWithAcks(List(topic), List(partition), message, SyncProducerConfig.DefaultRequiredAcks) } - def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { + def produceRequestWithAcks(topics: Seq[String], partitions: Seq[Int], message: ByteBufferMessageSet, acks: Int): kafka.api.ProducerRequest = { + val correlationId = SyncProducerConfig.DefaultCorrelationId val clientId = SyncProducerConfig.DefaultClientId - val requiredAcks: Short = SyncProducerConfig.DefaultRequiredAcks val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs - var partitionData = Array[PartitionData]( new PartitionData(partition, message) ) - var data = Array[TopicData]( new TopicData(topic, partitionData) ) - new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data) + val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray)) + new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeout, data.toArray) } def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {