diff --git a/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 660a977..7b23223 100644 --- a/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -24,12 +24,11 @@ import kafka.utils.{VerifiableProperties, ZKConfig} object ConsumerConfig { val SocketTimeout = 30 * 1000 val SocketBufferSize = 64*1024 - val FetchSize = 1024 * 1024 - val MaxFetchSize = 10*FetchSize + val FetchSize = 10 * 1024 * 1024 val DefaultFetcherBackoffMs = 1000 val AutoCommit = true val AutoCommitInterval = 10 * 1000 - val MaxQueuedChunks = 10 + val MaxQueuedChunks = 2 val MaxRebalanceRetries = 4 val AutoOffsetReset = OffsetRequest.SmallestTimeString val ConsumerTimeoutMs = -1 @@ -66,9 +65,12 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** the socket receive buffer for network requests */ val socketBufferSize = props.getInt("socket.buffersize", SocketBufferSize) - /** the number of byes of messages to attempt to fetch */ + /** + * The number of bytes allocated for a multi-fetch request (to be divided + * evenly across all partitions in the fetch request). + */ val fetchSize = props.getInt("fetch.size", FetchSize) - + /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */ val autoCommit = props.getBoolean("autocommit.enable", AutoCommit) diff --git a/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 6d73c82..ea70db4 100644 --- a/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -23,7 +23,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping} import collection.mutable import kafka.message.ByteBufferMessageSet import kafka.message.MessageAndOffset -import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} +import kafka.api.{FetchResponsePartitionData, FetchRequestBuilder} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicLong @@ -35,8 +35,9 @@ import java.util.concurrent.locks.ReentrantLock /** * Abstract class for fetching data from multiple partitions from the same broker. */ -abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) +abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, + socketBufferSize: Int, fetchSize: Int, + fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) extends ShutdownableThread(name) { private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map private val partitionMapLock = new ReentrantLock @@ -64,85 +65,128 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke simpleConsumer.close() } - override def doWork() { - val fetchRequestuilder = new FetchRequestBuilder(). + private case class TopicPartitionAndOffset(topic: String, partition: Int, offset: Long) + private case class FetchResult(errorPartitions: Set[TopicAndPartition], + incompletePartitions: Set[TopicPartitionAndOffset]) + + private def fetch(partitions: Set[TopicPartitionAndOffset], fetchSize: Int): FetchResult = { + val fetchRequestBuilder = new FetchRequestBuilder(). clientId(clientId + "-" + brokerInfo). replicaId(fetcherBrokerId). maxWait(maxWait). minBytes(minBytes) - partitionMapLock.lock() - try { - while (partitionMap.isEmpty) - partitionMapCond.await() - partitionMap.foreach { - case((topicAndPartition, offset)) => - fetchRequestuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, - offset, fetchSize) - } - } finally { - partitionMapLock.unlock() - } + val partitionFetchSize = fetchSize / partitions.size /* could be very small or even zero */ + partitions.foreach(tpo => fetchRequestBuilder.addFetch(tpo.topic, tpo.partition, tpo.offset, partitionFetchSize)) + val fetchRequest = fetchRequestBuilder.build() - val fetchRequest = fetchRequestuilder.build() - val partitionsWithError = new mutable.HashSet[TopicAndPartition] - var response: FetchResponse = null try { trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) - response = simpleConsumer.fetch(fetchRequest) - } catch { - case t => - debug("error in fetch %s".format(fetchRequest), t) - if (isRunning.get) { - partitionMapLock synchronized { - partitionsWithError ++= partitionMap.keys - } - } - } - fetcherMetrics.requestRate.mark() - - if (response != null) { + fetcherMetrics.requestRate.mark() + val response = simpleConsumer.fetch(fetchRequest) // process fetched data partitionMapLock.lock() try { - response.data.foreach { - case(topicAndPartition, partitionData) => - val (topic, partitionId) = topicAndPartition.asTuple - val currentOffset = partitionMap.get(topicAndPartition) - if (currentOffset.isDefined) { - partitionData.error match { - case ErrorMapping.NoError => - val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - val validBytes = messages.validBytes - val newOffset = messages.lastOption match { - case Some(m: MessageAndOffset) => m.nextOffset - case None => currentOffset.get - } - partitionMap.put(topicAndPartition, newOffset) - fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset - fetcherMetrics.byteRate.mark(validBytes) + /* + * Go through each partition in the response object. + * - for non-error/complete partitions, process the data. + * - for error and incomplete incomplete partitions, fold them into a + * single FetchResult. + */ + response.data.foldLeft(FetchResult(Set.empty, Set.empty))((foldedFetchResults, curr) => { + val topicAndPartition = curr._1 + val partitionData = curr._2 + val (topic, partitionId) = topicAndPartition.asTuple + val currentOffset = partitionMap.get(topicAndPartition) + if (currentOffset.isDefined) { + partitionData.error match { + case ErrorMapping.NoError => + val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] + val validBytes = messages.validBytes + val newOffset = messages.lastOption match { + case Some(m: MessageAndOffset) => m.nextOffset + case None => currentOffset.get + } + partitionMap.put(topicAndPartition, newOffset) + fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset + fetcherMetrics.byteRate.mark(validBytes) + if (validBytes == 0) { + FetchResult(foldedFetchResults.errorPartitions, + foldedFetchResults.incompletePartitions + + TopicPartitionAndOffset(topic, partitionId, currentOffset.get)) + } else { // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread processPartitionData(topicAndPartition, currentOffset.get, partitionData) - case ErrorMapping.OffsetOutOfRangeCode => - val newOffset = handleOffsetOutOfRange(topicAndPartition) - partitionMap.put(topicAndPartition, newOffset) - warn("current offset %d for topic %s partition %d out of range; reset offset to %d" - .format(currentOffset.get, topic, partitionId, newOffset)) - case _ => - warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), - ErrorMapping.exceptionFor(partitionData.error)) - partitionsWithError += topicAndPartition - } + foldedFetchResults + } + case ErrorMapping.OffsetOutOfRangeCode => + val newOffset = handleOffsetOutOfRange(topicAndPartition) + partitionMap.put(topicAndPartition, newOffset) + warn("current offset %d for topic %s partition %d out of range; reset offset to %d" + .format(currentOffset.get, topic, partitionId, newOffset)) + foldedFetchResults + case _ => + warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), + ErrorMapping.exceptionFor(partitionData.error)) + FetchResult(foldedFetchResults.errorPartitions + topicAndPartition, + foldedFetchResults.incompletePartitions) } - } - } finally { + } else { + foldedFetchResults + } + }) + } + finally { partitionMapLock.unlock() } } + catch { + case t: Throwable => + debug("error in fetch %s".format(fetchRequest), t) + if (isRunning.get) { + FetchResult(partitions.map(e => TopicAndPartition(e.topic, e.partition)), Set.empty) + } else { + FetchResult(Set.empty, Set.empty) + } + + } + + } + + override def doWork() { + partitionMapLock.lock() + val partitionsToFetch: Set[TopicPartitionAndOffset] = try { + while (partitionMap.isEmpty) + partitionMapCond.await() + partitionMap.iterator.map { + case (topicAndPartition, offset) => + TopicPartitionAndOffset(topicAndPartition.topic, topicAndPartition.partition, offset) + }.toSet + } + finally { + partitionMapLock.unlock() + } + + val FetchResult(errorPartitions1, incompletePartitions1) = fetch(partitionsToFetch, fetchSize) + + val FetchResult(errorPartitions2, incompletePartitions2) = if (incompletePartitions1.nonEmpty) { + logger.warn("Retrying fetch serially with fetch size %d for incomplete partitions %s." + .format(fetchSize, incompletePartitions1)) + incompletePartitions1.foldLeft(FetchResult(Set.empty, Set.empty))((foldedFetchResults, curr) => { + val thisFetchResult = fetch(Set(curr), fetchSize) + FetchResult(foldedFetchResults.errorPartitions ++ thisFetchResult.errorPartitions, + foldedFetchResults.incompletePartitions ++ thisFetchResult.incompletePartitions) + }) + } else { + FetchResult(Set.empty, Set.empty) + } + if (incompletePartitions2.nonEmpty) + logger.warn("After retrying fetch, the following partitions are still incomplete: " + incompletePartitions2) - if(partitionsWithError.size > 0) { - debug("handling partitions with error for %s".format(partitionsWithError)) - handlePartitionsWithErrors(partitionsWithError) + val allErrorPartitions = errorPartitions1 ++ errorPartitions2 + if(allErrorPartitions.nonEmpty) { + debug("handling partitions with error for %s".format(allErrorPartitions)) + handlePartitionsWithErrors(allErrorPartitions) } } diff --git a/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala b/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala index 3488908..6a31d08 100644 --- a/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -146,7 +146,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the socket receive buffer for network requests */ val replicaSocketBufferSize = props.getInt("replica.socket.buffersize", ConsumerConfig.SocketBufferSize) - /* the number of byes of messages to attempt to fetch */ + /** + * The number of bytes allocated for a multi-fetch request (to be divided + * evenly across all partitions in the fetch request). + */ val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize) /* max wait time for each fetcher request issued by follower replicas*/ diff --git a/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index d0e3590..1c3e6ed 100644 --- a/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -23,15 +23,19 @@ import kafka.utils.TestUtils._ import kafka.producer.KeyedMessage import kafka.serializer.StringEncoder import kafka.admin.CreateTopicCommand -import kafka.utils.TestUtils +import kafka.utils.{Logging, TestUtils} import junit.framework.Assert._ -class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { +class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val props = createBrokerConfigs(2) - val configs = props.map(p => new KafkaConfig(p)) + val configs = props.map(p => new KafkaConfig(p) { + override val flushInterval = 1 + override val replicaFetchSize = 100 + }) var brokers: Seq[KafkaServer] = null val topic1 = "foo" val topic2 = "bar" + val topic3 = "longfoobar" override def setUp() { super.setUp() @@ -47,24 +51,27 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { val partition = 0 val testMessageList1 = List("test1", "test2", "test3", "test4") val testMessageList2 = List("test5", "test6", "test7", "test8") + val testMessageList3 = List("averylongmessage") // create a topic and partition and await leadership - for (topic <- List(topic1,topic2)) { + for (topic <- List(topic1, topic2, topic3)) { CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":")) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) } // send test messages to leader - val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), + val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder(), new StringEncoder()) - val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m)) + val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ + testMessageList2.map(m => new KeyedMessage(topic2, m, m)) ++ + testMessageList3.map(m => new KeyedMessage(topic3, m, m)) producer.send(messages:_*) producer.close() def logsMatch(): Boolean = { var result = true - for (topic <- List(topic1, topic2)) { + for (topic <- List(topic1, topic2, topic3)) { val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total && (expectedOffset == item.getLogManager().getLog(topic, partition).get.logEndOffset) }