From 2b8b0f06c0b666ec1cc772134c915eeeff07031c Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 17 Mar 2015 16:00:42 -0700 Subject: [PATCH] KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. --- .../scala/kafka/server/AbstractFetcherThread.scala | 131 +++++++++++++-------- .../scala/kafka/server/ReplicaFetcherThread.scala | 2 +- 2 files changed, 80 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index e731df4..a54dc5d 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -23,6 +23,7 @@ import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} import kafka.utils.Utils.inLock +import kafka.utils.DelayedItem import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} import kafka.metrics.KafkaMetricsGroup @@ -37,10 +38,10 @@ import com.yammer.metrics.core.Gauge * Abstract class for fetching data from multiple partitions from the same broker. */ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0, + fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { - private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map + private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map private val partitionMapLock = new ReentrantLock private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) @@ -77,11 +78,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke override def doWork() { inLock(partitionMapLock) { if (partitionMap.isEmpty) - partitionMapCond.await(200L, TimeUnit.MILLISECONDS) + partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) partitionMap.foreach { - case((topicAndPartition, offset)) => - fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, - offset, fetchSize) + case((topicAndPartition, partitionFetchState)) => + if(partitionFetchState.isActive) + fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, + partitionFetchState.offset, fetchSize) } } @@ -115,53 +117,53 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple - val currentOffset = partitionMap.get(topicAndPartition) - // we append to the log if the current offset is defined and it is the same as the offset requested during fetch - if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) { - partitionData.error match { - case ErrorMapping.NoError => - try { - val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - val validBytes = messages.validBytes - val newOffset = messages.shallowIterator.toSeq.lastOption match { - case Some(m: MessageAndOffset) => m.nextOffset - case None => currentOffset.get + partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState => + // we append to the log if the current offset is defined and it is the same as the offset requested during fetch + if (fetchRequest.requestInfo(topicAndPartition).offset == currentPartitionFetchState.offset) { + partitionData.error match { + case ErrorMapping.NoError => + try { + val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] + val validBytes = messages.validBytes + val newOffset = messages.shallowIterator.toSeq.lastOption match { + case Some(m: MessageAndOffset) => m.nextOffset + case None => currentPartitionFetchState.offset + } + partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) + fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset + fetcherStats.byteRate.mark(validBytes) + // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread + processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData) + } catch { + case ime: InvalidMessageException => + // we log the error and continue. This ensures two things + // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag + // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and + // should get fixed in the subsequent fetches + logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset + " error " + ime.getMessage) + case e: Throwable => + throw new KafkaException("error processing data for partition [%s,%d] offset %d" + .format(topic, partitionId, currentPartitionFetchState.offset), e) } - partitionMap.put(topicAndPartition, newOffset) - fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset - fetcherStats.byteRate.mark(validBytes) - // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread - processPartitionData(topicAndPartition, currentOffset.get, partitionData) - } catch { - case ime: InvalidMessageException => - // we log the error and continue. This ensures two things - // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag - // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and - // should get fixed in the subsequent fetches - logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) - case e: Throwable => - throw new KafkaException("error processing data for partition [%s,%d] offset %d" - .format(topic, partitionId, currentOffset.get), e) - } - case ErrorMapping.OffsetOutOfRangeCode => - try { - val newOffset = handleOffsetOutOfRange(topicAndPartition) - partitionMap.put(topicAndPartition, newOffset) - error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" - .format(currentOffset.get, topic, partitionId, newOffset)) - } catch { - case e: Throwable => - error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) + case ErrorMapping.OffsetOutOfRangeCode => + try { + val newOffset = handleOffsetOutOfRange(topicAndPartition) + partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) + error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" + .format(currentPartitionFetchState.offset, topic, partitionId, newOffset)) + } catch { + case e: Throwable => + error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) + partitionsWithError += topicAndPartition + } + case _ => + if (isRunning.get) { + error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, + ErrorMapping.exceptionFor(partitionData.error).getClass)) partitionsWithError += topicAndPartition - } - case _ => - if (isRunning.get) { - error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, - ErrorMapping.exceptionFor(partitionData.error).getClass)) - partitionsWithError += topicAndPartition - } - } - } + } + } + }) } } } @@ -180,7 +182,23 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke if (!partitionMap.contains(topicAndPartition)) partitionMap.put( topicAndPartition, - if (PartitionTopicInfo.isOffsetInvalid(offset)) handleOffsetOutOfRange(topicAndPartition) else offset) + if (PartitionTopicInfo.isOffsetInvalid(offset)) new PartitionFetchState(handleOffsetOutOfRange(topicAndPartition)) + else new PartitionFetchState(offset) + )} + partitionMapCond.signalAll() + } finally { + partitionMapLock.unlock() + } + } + + def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) { + partitionMapLock.lockInterruptibly() + try { + for (partition <- partitions) { + partitionMap.get(partition).foreach (currentPartitionFetchState => + if(currentPartitionFetchState.isActive) + partitionMap.put(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay))) + ) } partitionMapCond.signalAll() } finally { @@ -247,3 +265,12 @@ class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) { override def toString = "%s-%s-%d".format(clientId, topic, partitionId) } + +case class PartitionFetchState(offset: Long, delay: DelayedItem) { + + def this(offset: Long) = this(offset, new DelayedItem(0)) + + def isActive: Boolean = { delay.getDelay(TimeUnit.MILLISECONDS) == 0 } + + override def toString = "%d-%b".format(offset, isActive) +} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 96faa7b..daf21d9 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -120,6 +120,6 @@ class ReplicaFetcherThread(name:String, // any logic for partitions whose leader has changed def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { - // no handler needed since the controller will make the changes accordingly + delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs) } } -- 1.9.5 (Apple Git-50.3)