From ee6c453a6b66cbebd4be61d770331da8a38969cb Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 17 Mar 2015 16:00:42 -0700 Subject: [PATCH 1/4] KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. --- .../scala/kafka/server/AbstractFetcherThread.scala | 131 ++++++++++++--------- .../scala/kafka/server/ReplicaFetcherThread.scala | 2 +- 2 files changed, 79 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index f178527..0bd9317 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -22,7 +22,7 @@ import kafka.utils.{Pool, ShutdownableThread} import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} -import kafka.utils.CoreUtils.inLock +import kafka.utils.Utils.inLock import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} import kafka.metrics.KafkaMetricsGroup @@ -36,12 +36,11 @@ import com.yammer.metrics.core.Gauge /** * Abstract class for fetching data from multiple partitions from the same broker. */ - abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndPoint, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { - private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map + private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map private val partitionMapLock = new ReentrantLock private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) @@ -78,11 +77,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke override def doWork() { inLock(partitionMapLock) { if (partitionMap.isEmpty) - partitionMapCond.await(200L, TimeUnit.MILLISECONDS) + partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) partitionMap.foreach { - case((topicAndPartition, offset)) => - fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, - offset, fetchSize) + case((topicAndPartition, partitionFetchState)) => + if(partitionFetchState.isActive) + fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, + partitionFetchState.offset, fetchSize) } } @@ -116,53 +116,53 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple - val currentOffset = partitionMap.get(topicAndPartition) - // we append to the log if the current offset is defined and it is the same as the offset requested during fetch - if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) { - partitionData.error match { - case ErrorMapping.NoError => - try { - val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - val validBytes = messages.validBytes - val newOffset = messages.shallowIterator.toSeq.lastOption match { - case Some(m: MessageAndOffset) => m.nextOffset - case None => currentOffset.get + partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState => + // we append to the log if the current offset is defined and it is the same as the offset requested during fetch + if (fetchRequest.requestInfo(topicAndPartition).offset == currentPartitionFetchState.offset) { + partitionData.error match { + case ErrorMapping.NoError => + try { + val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] + val validBytes = messages.validBytes + val newOffset = messages.shallowIterator.toSeq.lastOption match { + case Some(m: MessageAndOffset) => m.nextOffset + case None => currentPartitionFetchState.offset + } + partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) + fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset + fetcherStats.byteRate.mark(validBytes) + // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread + processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData) + } catch { + case ime: InvalidMessageException => + // we log the error and continue. This ensures two things + // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag + // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and + // should get fixed in the subsequent fetches + logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset + " error " + ime.getMessage) + case e: Throwable => + throw new KafkaException("error processing data for partition [%s,%d] offset %d" + .format(topic, partitionId, currentPartitionFetchState.offset), e) + } + case ErrorMapping.OffsetOutOfRangeCode => + try { + val newOffset = handleOffsetOutOfRange(topicAndPartition) + partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) + error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" + .format(currentPartitionFetchState.offset, topic, partitionId, newOffset)) + } catch { + case e: Throwable => + error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) + partitionsWithError += topicAndPartition } - partitionMap.put(topicAndPartition, newOffset) - fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset - fetcherStats.byteRate.mark(validBytes) - // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread - processPartitionData(topicAndPartition, currentOffset.get, partitionData) - } catch { - case ime: InvalidMessageException => - // we log the error and continue. This ensures two things - // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag - // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and - // should get fixed in the subsequent fetches - logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) - case e: Throwable => - throw new KafkaException("error processing data for partition [%s,%d] offset %d" - .format(topic, partitionId, currentOffset.get), e) - } - case ErrorMapping.OffsetOutOfRangeCode => - try { - val newOffset = handleOffsetOutOfRange(topicAndPartition) - partitionMap.put(topicAndPartition, newOffset) - error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" - .format(currentOffset.get, topic, partitionId, newOffset)) - } catch { - case e: Throwable => - error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) + case _ => + if (isRunning.get) { + error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, + ErrorMapping.exceptionFor(partitionData.error).getClass)) partitionsWithError += topicAndPartition - } - case _ => - if (isRunning.get) { - error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, - ErrorMapping.exceptionFor(partitionData.error).getClass)) - partitionsWithError += topicAndPartition - } - } - } + } + } + }) } } } @@ -181,7 +181,23 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke if (!partitionMap.contains(topicAndPartition)) partitionMap.put( topicAndPartition, - if (PartitionTopicInfo.isOffsetInvalid(offset)) handleOffsetOutOfRange(topicAndPartition) else offset) + if (PartitionTopicInfo.isOffsetInvalid(offset)) new PartitionFetchState(handleOffsetOutOfRange(topicAndPartition)) + else new PartitionFetchState(offset) + )} + partitionMapCond.signalAll() + } finally { + partitionMapLock.unlock() + } + } + + def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) { + partitionMapLock.lockInterruptibly() + try { + for (partition <- partitions) { + partitionMap.get(partition).foreach (currentPartitionFetchState => + if(currentPartitionFetchState.isActive) + partitionMap.put(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay))) + ) } partitionMapCond.signalAll() } finally { @@ -248,3 +264,12 @@ class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) { override def toString = "%s-%s-%d".format(clientId, topic, partitionId) } + +case class PartitionFetchState(offset: Long, delay: DelayedItem) { + + def this(offset: Long) = this(offset, new DelayedItem(0)) + + def isActive: Boolean = { delay.getDelay(TimeUnit.MILLISECONDS) == 0 } + + override def toString = "%d-%b".format(offset, isActive) +} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 2d84afa..b31b432 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -120,6 +120,6 @@ class ReplicaFetcherThread(name:String, // any logic for partitions whose leader has changed def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { - // no handler needed since the controller will make the changes accordingly + delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs) } } -- 1.9.5 (Apple Git-50.3) From a307dedbef65e695a38803155f960faec9190e8b Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 27 Mar 2015 15:18:53 -0700 Subject: [PATCH 2/4] KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 0bd9317..ec28a5b 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -45,6 +45,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port) + private var allPartitionsInactive: Boolean = true val fetcherStats = new FetcherStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId) val fetchRequestBuilder = new FetchRequestBuilder(). @@ -76,16 +77,20 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke override def doWork() { inLock(partitionMapLock) { - if (partitionMap.isEmpty) - partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) + allPartitionsInactive = true partitionMap.foreach { case((topicAndPartition, partitionFetchState)) => - if(partitionFetchState.isActive) + if(partitionFetchState.isActive) { + allPartitionsInactive = false fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, partitionFetchState.offset, fetchSize) + } } } + if (partitionMap.isEmpty || allPartitionsInactive) + partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) + val fetchRequest = fetchRequestBuilder.build() if (!fetchRequest.requestInfo.isEmpty) processFetchRequest(fetchRequest) -- 1.9.5 (Apple Git-50.3) From 149463607198ac479c6dc93d345874b84285b013 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 27 Mar 2015 17:01:09 -0700 Subject: [PATCH 3/4] KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index ec28a5b..d058e3e 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -45,7 +45,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port) - private var allPartitionsInactive: Boolean = true val fetcherStats = new FetcherStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId) val fetchRequestBuilder = new FetchRequestBuilder(). @@ -76,8 +75,9 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } override def doWork() { + // check if all topics are inactive + var allPartitionsInactive = true inLock(partitionMapLock) { - allPartitionsInactive = true partitionMap.foreach { case((topicAndPartition, partitionFetchState)) => if(partitionFetchState.isActive) { @@ -88,8 +88,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } - if (partitionMap.isEmpty || allPartitionsInactive) + if (partitionMap.isEmpty || allPartitionsInactive) { + trace("Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) + } val fetchRequest = fetchRequestBuilder.build() if (!fetchRequest.requestInfo.isEmpty) @@ -270,6 +272,9 @@ case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: override def toString = "%s-%s-%d".format(clientId, topic, partitionId) } +/** + * case class to keep partition offset and its state(active , inactive) + */ case class PartitionFetchState(offset: Long, delay: DelayedItem) { def this(offset: Long) = this(offset, new DelayedItem(0)) -- 1.9.5 (Apple Git-50.3) From 401b259abe37fb7a0bcc3117efbe56a869ef3235 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 7 Apr 2015 08:40:29 -0700 Subject: [PATCH 4/4] KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. --- .../scala/kafka/server/AbstractFetcherThread.scala | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index d058e3e..a439046 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -22,7 +22,8 @@ import kafka.utils.{Pool, ShutdownableThread} import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} -import kafka.utils.Utils.inLock +import kafka.utils.DelayedItem +import kafka.utils.CoreUtils.inLock import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} import kafka.metrics.KafkaMetricsGroup @@ -75,27 +76,24 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } override def doWork() { - // check if all topics are inactive - var allPartitionsInactive = true + inLock(partitionMapLock) { partitionMap.foreach { case((topicAndPartition, partitionFetchState)) => - if(partitionFetchState.isActive) { - allPartitionsInactive = false + if(partitionFetchState.isActive) fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, partitionFetchState.offset, fetchSize) - } } } - if (partitionMap.isEmpty || allPartitionsInactive) { - trace("Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) - partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) - } - val fetchRequest = fetchRequestBuilder.build() + if (!fetchRequest.requestInfo.isEmpty) processFetchRequest(fetchRequest) + else { + trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) + partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) + } } private def processFetchRequest(fetchRequest: FetchRequest) { -- 1.9.5 (Apple Git-50.3)