From f5316d45e60fea42d41b8dcf81ed1330136591fe Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 10 Mar 2015 22:32:20 -0700 Subject: [PATCH] KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. --- .../src/main/scala/kafka/consumer/ConsumerFetcherThread.scala | 3 ++- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 11 ++++++++--- core/src/main/scala/kafka/server/KafkaConfig.scala | 9 ++++++++- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 1 + .../scala/unit/kafka/server/KafkaConfigConfigDefTest.scala | 1 + core/src/test/scala/unit/kafka/utils/TestUtils.scala | 1 + 6 files changed, 21 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index ee6139c..152fda5 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -29,7 +29,7 @@ class ConsumerFetcherThread(name: String, sourceBroker: Broker, partitionMap: Map[TopicAndPartition, PartitionTopicInfo], val consumerFetcherManager: ConsumerFetcherManager) - extends AbstractFetcherThread(name = name, + extends AbstractFetcherThread(name = name, clientId = config.clientId, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs, @@ -38,6 +38,7 @@ class ConsumerFetcherThread(name: String, fetcherBrokerId = Request.OrdinaryConsumerId, maxWait = config.fetchWaitMaxMs, minBytes = config.fetchMinBytes, + fetchBackOffMs = config.refreshLeaderBackoffMs, isInterruptible = true) { // process fetched data diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 8c281d4..427fc3d 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -37,7 +37,7 @@ import com.yammer.metrics.core.Gauge * Abstract class for fetching data from multiple partitions from the same broker. */ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, + fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map @@ -66,7 +66,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) override def shutdown(){ - super.shutdown() + initiateShutdown() + inLock(partitionMapLock) { + partitionMapCond.signalAll() + } + awaitShutdown() simpleConsumer.close() } @@ -99,6 +103,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys } + // there is an error occured while fetching partitions, sleep a while + partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } } fetcherStats.requestRate.mark() @@ -241,4 +247,3 @@ class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) { override def toString = "%s-%s-%d".format(clientId, topic, partitionId) } - diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 48e3362..46d21c7 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -94,6 +94,7 @@ object Defaults { val ReplicaFetchWaitMaxMs = 500 val ReplicaFetchMinBytes = 1 val NumReplicaFetchers = 1 + val ReplicaFetchBackoffMs = 1000 val ReplicaHighWatermarkCheckpointIntervalMs = 5000L val FetchPurgatoryPurgeIntervalRequests = 1000 val ProducerPurgatoryPurgeIntervalRequests = 1000 @@ -199,6 +200,7 @@ object KafkaConfig { val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes" val ReplicaFetchWaitMaxMsProp = "replica.fetch.wait.max.ms" val ReplicaFetchMinBytesProp = "replica.fetch.min.bytes" + val ReplicaFetchBackoffMsProp = "replica.fetch.backoff.ms" val NumReplicaFetchersProp = "num.replica.fetchers" val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms" val FetchPurgatoryPurgeIntervalRequestsProp = "fetch.purgatory.purge.interval.requests" @@ -311,6 +313,7 @@ object KafkaConfig { val ReplicaFetchMinBytesDoc = "Minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs" val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate messages from a source broker. " + "Increasing this value can increase the degree of I/O parallelism in the follower broker." + val ReplicaFetchBackoffMsDoc = "The amount of time to sleep when fetch partition error occurs." val ReplicaHighWatermarkCheckpointIntervalMsDoc = "The frequency with which the high watermark is saved out to disk" val FetchPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the fetch request purgatory" val ProducerPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the producer request purgatory" @@ -429,6 +432,7 @@ object KafkaConfig { .define(ReplicaSocketReceiveBufferBytesProp, INT, Defaults.ReplicaSocketReceiveBufferBytes, HIGH, ReplicaSocketReceiveBufferBytesDoc) .define(ReplicaFetchMaxBytesProp, INT, Defaults.ReplicaFetchMaxBytes, HIGH, ReplicaFetchMaxBytesDoc) .define(ReplicaFetchWaitMaxMsProp, INT, Defaults.ReplicaFetchWaitMaxMs, HIGH, ReplicaFetchWaitMaxMsDoc) + .define(ReplicaFetchBackoffMsProp, INT, Defaults.ReplicaFetchBackoffMs, atLeast(0), MEDIUM, ReplicaFetchBackoffMsDoc) .define(ReplicaFetchMinBytesProp, INT, Defaults.ReplicaFetchMinBytes, HIGH, ReplicaFetchMinBytesDoc) .define(NumReplicaFetchersProp, INT, Defaults.NumReplicaFetchers, HIGH, NumReplicaFetchersDoc) .define(ReplicaHighWatermarkCheckpointIntervalMsProp, LONG, Defaults.ReplicaHighWatermarkCheckpointIntervalMs, HIGH, ReplicaHighWatermarkCheckpointIntervalMsDoc) @@ -548,6 +552,7 @@ object KafkaConfig { replicaFetchMaxBytes = parsed.get(ReplicaFetchMaxBytesProp).asInstanceOf[Int], replicaFetchWaitMaxMs = parsed.get(ReplicaFetchWaitMaxMsProp).asInstanceOf[Int], replicaFetchMinBytes = parsed.get(ReplicaFetchMinBytesProp).asInstanceOf[Int], + replicaFetchBackoffMs = parsed.get(ReplicaFetchBackoffMsProp).asInstanceOf[Int], numReplicaFetchers = parsed.get(NumReplicaFetchersProp).asInstanceOf[Int], replicaHighWatermarkCheckpointIntervalMs = parsed.get(ReplicaHighWatermarkCheckpointIntervalMsProp).asInstanceOf[Long], fetchPurgatoryPurgeIntervalRequests = parsed.get(FetchPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int], @@ -688,6 +693,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val replicaFetchMaxBytes: Int = Defaults.ReplicaFetchMaxBytes, val replicaFetchWaitMaxMs: Int = Defaults.ReplicaFetchWaitMaxMs, val replicaFetchMinBytes: Int = Defaults.ReplicaFetchMinBytes, + val replicaFetchBackoffMs: Int = Defaults.ReplicaFetchBackoffMs, val numReplicaFetchers: Int = Defaults.NumReplicaFetchers, val replicaHighWatermarkCheckpointIntervalMs: Long = Defaults.ReplicaHighWatermarkCheckpointIntervalMs, val fetchPurgatoryPurgeIntervalRequests: Int = Defaults.FetchPurgatoryPurgeIntervalRequests, @@ -856,6 +862,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(ReplicaFetchMaxBytesProp, replicaFetchMaxBytes.toString) props.put(ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) props.put(ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString) + props.put(ReplicaFetchBackoffMsProp, replicaFetchBackoffMs.toString) props.put(NumReplicaFetchersProp, numReplicaFetchers.toString) props.put(ReplicaHighWatermarkCheckpointIntervalMsProp, replicaHighWatermarkCheckpointIntervalMs.toString) props.put(FetchPurgatoryPurgeIntervalRequestsProp, fetchPurgatoryPurgeIntervalRequests.toString) @@ -886,4 +893,4 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index d6d14fb..96faa7b 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -37,6 +37,7 @@ class ReplicaFetcherThread(name:String, fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaFetchWaitMaxMs, minBytes = brokerConfig.replicaFetchMinBytes, + fetchBackOffMs = brokerConfig.replicaFetchBackoffMs, isInterruptible = false) { // process fetched data diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index c124c8d..191251d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -120,6 +120,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { Assert.assertEquals(expectedConfig.replicaFetchMaxBytes, actualConfig.replicaFetchMaxBytes) Assert.assertEquals(expectedConfig.replicaFetchWaitMaxMs, actualConfig.replicaFetchWaitMaxMs) Assert.assertEquals(expectedConfig.replicaFetchMinBytes, actualConfig.replicaFetchMinBytes) + Assert.assertEquals(expectedConfig.replicaFetchBackoffMs, actualConfig.replicaFetchBackoffMs) Assert.assertEquals(expectedConfig.numReplicaFetchers, actualConfig.numReplicaFetchers) Assert.assertEquals(expectedConfig.replicaHighWatermarkCheckpointIntervalMs, actualConfig.replicaHighWatermarkCheckpointIntervalMs) Assert.assertEquals(expectedConfig.fetchPurgatoryPurgeIntervalRequests, actualConfig.fetchPurgatoryPurgeIntervalRequests) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 52c7920..1682a77 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -164,6 +164,7 @@ object TestUtils extends Logging { props.put("log.dir", TestUtils.tempDir().getAbsolutePath) props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) props.put("replica.socket.timeout.ms", "1500") + props.put("controller.socket.timeout.ms", "1500") props.put("controlled.shutdown.enable", enableControlledShutdown.toString) props.put("delete.topic.enable", enableDeleteTopic.toString) props -- 1.9.5 (Apple Git-50.3)