From 8809eeedbcd6c023f181d6c1fe0b5739248fcb96 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sun, 22 Feb 2015 21:03:20 -0800 Subject: [PATCH 1/2] KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. --- .../kafka/server/AbstractFetcherManager.scala | 4 +- .../scala/kafka/server/AbstractFetcherThread.scala | 132 ++++++++++++--------- core/src/main/scala/kafka/server/KafkaConfig.scala | 5 + .../main/scala/kafka/server/OffsetAndDelay.scala | 35 ++++++ .../scala/kafka/server/ReplicaFetcherThread.scala | 6 +- .../scala/unit/kafka/server/ReplicaFetchTest.scala | 1 + 6 files changed, 127 insertions(+), 56 deletions(-) create mode 100644 core/src/main/scala/kafka/server/OffsetAndDelay.scala diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 20c00cb..042d3a0 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -85,7 +85,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri } fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) => - topicAndPartition -> brokerAndInitOffset.initOffset + topicAndPartition -> new OffsetAndDelay(brokerAndInitOffset.initOffset) }) } } @@ -128,4 +128,4 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri case class BrokerAndFetcherId(broker: Broker, fetcherId: Int) -case class BrokerAndInitialOffset(broker: Broker, initOffset: Long) \ No newline at end of file +case class BrokerAndInitialOffset(broker: Broker, initOffset: Long) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 8c281d4..e6bf2b3 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -23,6 +23,7 @@ import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} import kafka.utils.Utils.inLock +import kafka.utils.DelayedItem import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} import kafka.metrics.KafkaMetricsGroup @@ -40,7 +41,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { - private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map + private val partitionMap = new mutable.HashMap[TopicAndPartition, OffsetAndDelay] // a (topic, partition) -> offsetandstate map private val partitionMapLock = new ReentrantLock private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) @@ -75,9 +76,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke if (partitionMap.isEmpty) partitionMapCond.await(200L, TimeUnit.MILLISECONDS) partitionMap.foreach { - case((topicAndPartition, offset)) => - fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, - offset, fetchSize) + case((topicAndPartition, offsetAndState)) => + if(offsetAndState.isActive) + fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, + offsetAndState.offset, fetchSize) } } @@ -109,52 +111,57 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple - val currentOffset = partitionMap.get(topicAndPartition) - // we append to the log if the current offset is defined and it is the same as the offset requested during fetch - if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) { - partitionData.error match { - case ErrorMapping.NoError => - try { - val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - val validBytes = messages.validBytes - val newOffset = messages.shallowIterator.toSeq.lastOption match { - case Some(m: MessageAndOffset) => m.nextOffset - case None => currentOffset.get - } - partitionMap.put(topicAndPartition, newOffset) - fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset - fetcherStats.byteRate.mark(validBytes) - // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread - processPartitionData(topicAndPartition, currentOffset.get, partitionData) - } catch { - case ime: InvalidMessageException => - // we log the error and continue. This ensures two things - // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag - // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and - // should get fixed in the subsequent fetches - logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) - case e: Throwable => - throw new KafkaException("error processing data for partition [%s,%d] offset %d" - .format(topic, partitionId, currentOffset.get), e) + val currentOffsetAndDelayOpt: Option[OffsetAndDelay] = partitionMap.get(topicAndPartition) + currentOffsetAndDelayOpt match { + case Some(currentOffsetAndDelay) => + // we append to the log if the current offset is defined and it is the same as the offset requested during fetch + if (fetchRequest.requestInfo(topicAndPartition).offset == currentOffsetAndDelay.offset) { + partitionData.error match { + case ErrorMapping.NoError => + try { + val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] + val validBytes = messages.validBytes + val newOffset = messages.shallowIterator.toSeq.lastOption match { + case Some(m: MessageAndOffset) => m.nextOffset + case None => currentOffsetAndDelay.offset + } + partitionMap.put(topicAndPartition, new OffsetAndDelay(newOffset)) + fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset + fetcherStats.byteRate.mark(validBytes) + // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread + processPartitionData(topicAndPartition, currentOffsetAndDelay.offset, partitionData) + } catch { + case ime: InvalidMessageException => + // we log the error and continue. This ensures two things + // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag + // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and + // should get fixed in the subsequent fetches + logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + + currentOffsetAndDelay.offset + " error " + ime.getMessage) + case e: Throwable => + throw new KafkaException("error processing data for partition [%s,%d] offset %d" + .format(topic, partitionId, currentOffsetAndDelay.offset), e) + } + case ErrorMapping.OffsetOutOfRangeCode => + try { + val newOffset = handleOffsetOutOfRange(topicAndPartition) + partitionMap.put(topicAndPartition, new OffsetAndDelay(newOffset)) + error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" + .format(currentOffsetAndDelay.offset, topic, partitionId, newOffset)) + } catch { + case e: Throwable => + error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) + partitionsWithError += topicAndPartition + } + case _ => + if (isRunning.get) { + error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, + ErrorMapping.exceptionFor(partitionData.error).getClass)) + partitionsWithError += topicAndPartition + } } - case ErrorMapping.OffsetOutOfRangeCode => - try { - val newOffset = handleOffsetOutOfRange(topicAndPartition) - partitionMap.put(topicAndPartition, newOffset) - error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" - .format(currentOffset.get, topic, partitionId, newOffset)) - } catch { - case e: Throwable => - error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) - partitionsWithError += topicAndPartition - } - case _ => - if (isRunning.get) { - error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, - ErrorMapping.exceptionFor(partitionData.error).getClass)) - partitionsWithError += topicAndPartition - } - } + } + case None => // do nothing } } } @@ -166,15 +173,35 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } - def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]) { + def addPartitions(partitionAndOffsets: Map[TopicAndPartition, OffsetAndDelay]) { partitionMapLock.lockInterruptibly() try { - for ((topicAndPartition, offset) <- partitionAndOffsets) { + for ((topicAndPartition, offsetAndState) <- partitionAndOffsets) { // If the partitionMap already has the topic/partition, then do not update the map with the old offset if (!partitionMap.contains(topicAndPartition)) partitionMap.put( topicAndPartition, - if (PartitionTopicInfo.isOffsetInvalid(offset)) handleOffsetOutOfRange(topicAndPartition) else offset) + if (PartitionTopicInfo.isOffsetInvalid(offsetAndState.offset)) new OffsetAndDelay(handleOffsetOutOfRange(topicAndPartition)) + else new OffsetAndDelay(offsetAndState.offset) + ) + } + partitionMapCond.signalAll() + } finally { + partitionMapLock.unlock() + } + } + + def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) { + partitionMapLock.lockInterruptibly() + try { + for (partition <- partitions) { + val currentOffsetAndDelayOpt: Option[OffsetAndDelay] = partitionMap.get(partition) + currentOffsetAndDelayOpt match { + case Some(currentOffsetAndDelay) => + if(currentOffsetAndDelay.isActive) // add a delay only if its active + partitionMap.put(partition, OffsetAndDelay(currentOffsetAndDelay.offset, new DelayedItem(delay))) + case None => // do nothing + } } partitionMapCond.signalAll() } finally { @@ -241,4 +268,3 @@ class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) { override def toString = "%s-%s-%d".format(clientId, topic, partitionId) } - diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 14bf321..d49643a 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -270,6 +270,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency with which the high watermark is saved out to disk */ val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L) + /** Before each replica fetch, the system needs time to recover from the state that caused the previous failure (leader + * migration, connection timeouts etc). This config determines the amount of time to wait before retrying. */ + val replicaFetcherRetryBackoffMs = props.getInt("replica.fetch.retry.backoff.ms", 10000) + + /* the purge interval (in number of requests) of the fetch request purgatory */ val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 1000) diff --git a/core/src/main/scala/kafka/server/OffsetAndDelay.scala b/core/src/main/scala/kafka/server/OffsetAndDelay.scala new file mode 100644 index 0000000..e203b87 --- /dev/null +++ b/core/src/main/scala/kafka/server/OffsetAndDelay.scala @@ -0,0 +1,35 @@ +package kafka.server + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Convenience case class to store OffsetAndDelay for AbstractFetcherThread + * to implement backoff retry + */ + +import kafka.utils.DelayedItem +import java.util.concurrent.TimeUnit + +case class OffsetAndDelay(offset: Long, delay: DelayedItem) { + + def this(offset: Long) = this(offset, new DelayedItem(0)) + + def isActive : Boolean = { delay.getDelay(TimeUnit.MILLISECONDS) == 0 } + + override def toString = "%d-%b".format(offset,isActive) +} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 6879e73..19e10ad 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -24,6 +24,8 @@ import kafka.message.ByteBufferMessageSet import kafka.api.{OffsetRequest, FetchResponsePartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} +import scala.collection.{mutable, Map} + class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, @@ -39,6 +41,7 @@ class ReplicaFetcherThread(name:String, minBytes = brokerConfig.replicaFetchMinBytes, isInterruptible = false) { + // process fetched data def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { try { @@ -119,6 +122,7 @@ class ReplicaFetcherThread(name:String, // any logic for partitions whose leader has changed def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { - // no handler needed since the controller will make the changes accordingly + delayPartitions(partitions, brokerConfig.replicaFetcherRetryBackoffMs) } + } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index da4bafc..b254368 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -73,4 +73,5 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { } waitUntilTrue(logsMatch, "Broker logs should be identical") } + } -- 1.9.3 (Apple Git-50) From 61fa9c7fc4e8fd0a69eb4d66f138a0934041901a Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 24 Feb 2015 09:51:42 -0800 Subject: [PATCH 2/2] KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index e6bf2b3..40fa661 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -41,7 +41,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { - private val partitionMap = new mutable.HashMap[TopicAndPartition, OffsetAndDelay] // a (topic, partition) -> offsetandstate map + private val partitionMap = new mutable.HashMap[TopicAndPartition, OffsetAndDelay] // a (topic, partition) -> offsetanddelay map private val partitionMapLock = new ReentrantLock private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) -- 1.9.3 (Apple Git-50)