Index: core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (revision 1415914) +++ core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (working copy) @@ -23,15 +23,20 @@ import kafka.producer.KeyedMessage import kafka.serializer.StringEncoder import kafka.admin.CreateTopicCommand -import kafka.utils.TestUtils +import kafka.utils.{Logging, TestUtils} import junit.framework.Assert._ -class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { +class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val props = createBrokerConfigs(2) - val configs = props.map(p => new KafkaConfig(p)) + val configs = props.map(p => new KafkaConfig(p) { + override val flushInterval = 1 + override val replicaFetchSize = 30 + override val replicaUpperFetchSize = 60 + }) var brokers: Seq[KafkaServer] = null val topic1 = "foo" val topic2 = "bar" + val topic3 = "longfoobar" override def setUp() { super.setUp() @@ -48,23 +53,27 @@ val testMessageList1 = List("test1", "test2", "test3", "test4") val testMessageList2 = List("test5", "test6", "test7", "test8") + val testMessageList3 = List("averylongmessage") + // create a topic and partition and await leadership - for (topic <- List(topic1,topic2)) { + for (topic <- List(topic1, topic2, topic3)) { CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":")) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) } // send test messages to leader - val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), + val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder(), new StringEncoder()) - val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m)) + val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ + testMessageList2.map(m => new KeyedMessage(topic2, m, m)) ++ + testMessageList3.map(m => new KeyedMessage(topic3, m, m)) producer.send(messages:_*) producer.close() def logsMatch(): Boolean = { var result = true - for (topic <- List(topic1, topic2)) { + for (topic <- List(topic1, topic2, topic3)) { val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total && (expectedOffset == item.getLogManager().getLog(topic, partition).get.logEndOffset) } Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala (revision 1415914) +++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala (working copy) @@ -68,6 +68,11 @@ /** the number of byes of messages to attempt to fetch */ val fetchSize = props.getInt("fetch.size", FetchSize) + /** + * If a fetch results in a partial message, a refetch will be issued if + * upperFetchSize > fetchSize. + */ + val upperFetchSize = props.getInt("upper.fetch.size", FetchSize) /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */ val autoCommit = props.getBoolean("autocommit.enable", AutoCommit) Index: core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (revision 1415914) +++ core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (working copy) @@ -34,7 +34,8 @@ sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs, socketBufferSize = config.socketBufferSize, - fetchSize = config.fetchSize, + lowerFetchSize = config.fetchSize, + upperFetchSize = config.upperFetchSize, fetcherBrokerId = Request.OrdinaryConsumerId, maxWait = config.maxFetchWaitMs, minBytes = config.minFetchBytes) { Index: core/src/main/scala/kafka/server/KafkaConfig.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaConfig.scala (revision 1415914) +++ core/src/main/scala/kafka/server/KafkaConfig.scala (working copy) @@ -148,6 +148,7 @@ /* the number of byes of messages to attempt to fetch */ val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize) + val replicaUpperFetchSize = props.getInt("replica.upper.fetch.size", ConsumerConfig.FetchSize) /* max wait time for each fetcher request issued by follower replicas*/ val replicaMaxWaitTimeMs = props.getInt("replica.fetch.wait.time.ms", 500) Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (revision 1415914) +++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (working copy) @@ -32,7 +32,8 @@ sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs, socketBufferSize = brokerConfig.replicaSocketBufferSize, - fetchSize = brokerConfig.replicaFetchSize, + lowerFetchSize = brokerConfig.replicaFetchSize, + upperFetchSize = brokerConfig.replicaUpperFetchSize, fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs, minBytes = brokerConfig.replicaMinBytes) { Index: core/src/main/scala/kafka/server/AbstractFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala (revision 1415914) +++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala (working copy) @@ -23,7 +23,7 @@ import collection.mutable import kafka.message.ByteBufferMessageSet import kafka.message.MessageAndOffset -import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} +import kafka.api.{FetchResponsePartitionData, FetchRequestBuilder} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicLong @@ -35,8 +35,9 @@ /** * Abstract class for fetching data from multiple partitions from the same broker. */ -abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) +abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, + socketBufferSize: Int, lowerFetchSize: Int, upperFetchSize: Int, + fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) extends ShutdownableThread(name) { private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map private val partitionMapLock = new ReentrantLock @@ -64,88 +65,116 @@ simpleConsumer.close() } - override def doWork() { - val fetchRequestuilder = new FetchRequestBuilder(). + private case class TopicPartitionAndOffset(topic: String, partition: Int, offset: Long) + private case class FetchResult(errorPartitions: Set[TopicAndPartition], + incompletePartitions: Set[TopicPartitionAndOffset]) + + private def doFetch(partitions: Set[TopicPartitionAndOffset], fetchSize: Int): FetchResult = { + val fetchRequestBuilder = new FetchRequestBuilder(). clientId(clientId + "-" + brokerInfo). replicaId(fetcherBrokerId). maxWait(maxWait). minBytes(minBytes) - partitionMapLock.lock() - try { - while (partitionMap.isEmpty) - partitionMapCond.await() - partitionMap.foreach { - case((topicAndPartition, offset)) => - fetchRequestuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, - offset, fetchSize) - } - } finally { - partitionMapLock.unlock() - } + partitions.foreach(tpo => fetchRequestBuilder.addFetch(tpo.topic, tpo.partition, tpo.offset, fetchSize)) + val fetchRequest = fetchRequestBuilder.build() - val fetchRequest = fetchRequestuilder.build() - val partitionsWithError = new mutable.HashSet[TopicAndPartition] - var response: FetchResponse = null try { trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) - response = simpleConsumer.fetch(fetchRequest) - } catch { - case t => - debug("error in fetch %s".format(fetchRequest), t) - if (isRunning.get) { - partitionMapLock synchronized { - partitionsWithError ++= partitionMap.keys - } - } - } - fetcherMetrics.requestRate.mark() - - if (response != null) { + fetcherMetrics.requestRate.mark() + val response = simpleConsumer.fetch(fetchRequest) // process fetched data partitionMapLock.lock() try { - response.data.foreach { - case(topicAndPartition, partitionData) => - val (topic, partitionId) = topicAndPartition.asTuple - val currentOffset = partitionMap.get(topicAndPartition) - if (currentOffset.isDefined) { - partitionData.error match { - case ErrorMapping.NoError => - val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - val validBytes = messages.validBytes - val newOffset = messages.lastOption match { - case Some(m: MessageAndOffset) => m.nextOffset - case None => currentOffset.get - } - partitionMap.put(topicAndPartition, newOffset) - fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset - fetcherMetrics.byteRate.mark(validBytes) + response.data.foldLeft(FetchResult(Set.empty, Set.empty))((folded, curr) => { + val topicAndPartition = curr._1 + val partitionData = curr._2 + val (topic, partitionId) = topicAndPartition.asTuple + val currentOffset = partitionMap.get(topicAndPartition) + if (currentOffset.isDefined) { + partitionData.error match { + case ErrorMapping.NoError => + val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] + val validBytes = messages.validBytes + val newOffset = messages.lastOption match { + case Some(m: MessageAndOffset) => m.nextOffset + case None => currentOffset.get + } + partitionMap.put(topicAndPartition, newOffset) + fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset + fetcherMetrics.byteRate.mark(validBytes) + if (validBytes == 0) + FetchResult(folded.errorPartitions, + folded.incompletePartitions + + TopicPartitionAndOffset(topic, partitionId, currentOffset.get)) + else { // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread processPartitionData(topicAndPartition, currentOffset.get, partitionData) - case ErrorMapping.OffsetOutOfRangeCode => - val newOffset = handleOffsetOutOfRange(topicAndPartition) - partitionMap.put(topicAndPartition, newOffset) - warn("current offset %d for topic %s partition %d out of range; reset offset to %d" - .format(currentOffset.get, topic, partitionId, newOffset)) - case _ => - warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), - ErrorMapping.exceptionFor(partitionData.error)) - partitionsWithError += topicAndPartition - } + folded + } + case ErrorMapping.OffsetOutOfRangeCode => + val newOffset = handleOffsetOutOfRange(topicAndPartition) + partitionMap.put(topicAndPartition, newOffset) + warn("current offset %d for topic %s partition %d out of range; reset offset to %d" + .format(currentOffset.get, topic, partitionId, newOffset)) + folded + case _ => + warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), + ErrorMapping.exceptionFor(partitionData.error)) + FetchResult(folded.errorPartitions + topicAndPartition, folded.incompletePartitions) } - } - } finally { + } + else + folded + }) + } + finally { partitionMapLock.unlock() } } + catch { + case t: Throwable => + debug("error in fetch %s".format(fetchRequest), t) + if (isRunning.get) { + FetchResult(partitions.map(e => TopicAndPartition(e.topic, e.partition)), Set.empty) + } + else + FetchResult(Set.empty, Set.empty) - if(partitionsWithError.size > 0) { - debug("handling partitions with error for %s".format(partitionsWithError)) - handlePartitionsWithErrors(partitionsWithError) } } + override def doWork() { + partitionMapLock.lock() + val partitionsToFetch: Set[TopicPartitionAndOffset] = try { + while (partitionMap.isEmpty) + partitionMapCond.await() + partitionMap.iterator.map { + case (topicAndPartition, offset) => + TopicPartitionAndOffset(topicAndPartition.topic, topicAndPartition.partition, offset) + }.toSet + } + finally { + partitionMapLock.unlock() + } + val FetchResult(errorPartitions1, incompletePartitions1) = doFetch(partitionsToFetch, lowerFetchSize) + val FetchResult(errorPartitions2, incompletePartitions2) = + if (incompletePartitions1.nonEmpty && upperFetchSize > lowerFetchSize) { + logger.warn("Retrying fetch with fetch size %d for incomplete partitions %s." + .format(upperFetchSize, incompletePartitions1)) + doFetch(incompletePartitions1, upperFetchSize) + } + else FetchResult(Set.empty, Set.empty) + if (incompletePartitions2.nonEmpty) + logger.warn("After fetch retry, the following partitions are still incomplete: " + incompletePartitions2) + + val allErrorPartitions = errorPartitions1 ++ errorPartitions2 + if(allErrorPartitions.nonEmpty) { + debug("handling partitions with error for %s".format(allErrorPartitions)) + handlePartitionsWithErrors(allErrorPartitions) + } + } + def addPartition(topic: String, partitionId: Int, initialOffset: Long) { partitionMapLock.lock() try {