From 04773acf2695f9dcffa342ce6953d7146c55cd1b Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 10 Mar 2015 22:32:20 -0700 Subject: [PATCH] KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 8 ++++++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 9 ++++++++- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 2 ++ .../test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala | 1 + core/src/test/scala/unit/kafka/utils/TestUtils.scala | 2 ++ 5 files changed, 19 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 8c281d4..d31490e 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -30,6 +30,7 @@ import scala.collection.{mutable, Set, Map} import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.CountDownLatch import com.yammer.metrics.core.Gauge @@ -37,7 +38,7 @@ import com.yammer.metrics.core.Gauge * Abstract class for fetching data from multiple partitions from the same broker. */ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, + fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Long = 0, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map @@ -45,6 +46,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port) + private val backOffWaitLatch = new CountDownLatch(1) val fetcherStats = new FetcherStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId) val fetchRequestBuilder = new FetchRequestBuilder(). @@ -66,6 +68,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) override def shutdown(){ + backOffWaitLatch.countDown() super.shutdown() simpleConsumer.close() } @@ -153,6 +156,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, ErrorMapping.exceptionFor(partitionData.error).getClass)) partitionsWithError += topicAndPartition + // there is an error occured while fetching partitions, sleep a while + backOffWaitLatch.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } } } @@ -241,4 +246,3 @@ class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) { override def toString = "%s-%s-%d".format(clientId, topic, partitionId) } - diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 48e3362..e6e4c1f 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -94,6 +94,7 @@ object Defaults { val ReplicaFetchWaitMaxMs = 500 val ReplicaFetchMinBytes = 1 val NumReplicaFetchers = 1 + val ReplicaFetchBackoffMs = 15 * 1000 val ReplicaHighWatermarkCheckpointIntervalMs = 5000L val FetchPurgatoryPurgeIntervalRequests = 1000 val ProducerPurgatoryPurgeIntervalRequests = 1000 @@ -199,6 +200,7 @@ object KafkaConfig { val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes" val ReplicaFetchWaitMaxMsProp = "replica.fetch.wait.max.ms" val ReplicaFetchMinBytesProp = "replica.fetch.min.bytes" + val ReplicaFetchBackoffMsProp = "replica.fetch.backoff.ms" val NumReplicaFetchersProp = "num.replica.fetchers" val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms" val FetchPurgatoryPurgeIntervalRequestsProp = "fetch.purgatory.purge.interval.requests" @@ -311,6 +313,7 @@ object KafkaConfig { val ReplicaFetchMinBytesDoc = "Minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs" val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate messages from a source broker. " + "Increasing this value can increase the degree of I/O parallelism in the follower broker." + val ReplicaFetchBackoffMsDoc = "The amount of time to sleep when fetch partition error occurs." val ReplicaHighWatermarkCheckpointIntervalMsDoc = "The frequency with which the high watermark is saved out to disk" val FetchPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the fetch request purgatory" val ProducerPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the producer request purgatory" @@ -429,6 +432,7 @@ object KafkaConfig { .define(ReplicaSocketReceiveBufferBytesProp, INT, Defaults.ReplicaSocketReceiveBufferBytes, HIGH, ReplicaSocketReceiveBufferBytesDoc) .define(ReplicaFetchMaxBytesProp, INT, Defaults.ReplicaFetchMaxBytes, HIGH, ReplicaFetchMaxBytesDoc) .define(ReplicaFetchWaitMaxMsProp, INT, Defaults.ReplicaFetchWaitMaxMs, HIGH, ReplicaFetchWaitMaxMsDoc) + .define(ReplicaFetchBackoffMsProp, LONG, Defaults.ReplicaFetchBackoffMs, atLeast(0), MEDIUM, ReplicaFetchBackoffMsDoc) .define(ReplicaFetchMinBytesProp, INT, Defaults.ReplicaFetchMinBytes, HIGH, ReplicaFetchMinBytesDoc) .define(NumReplicaFetchersProp, INT, Defaults.NumReplicaFetchers, HIGH, NumReplicaFetchersDoc) .define(ReplicaHighWatermarkCheckpointIntervalMsProp, LONG, Defaults.ReplicaHighWatermarkCheckpointIntervalMs, HIGH, ReplicaHighWatermarkCheckpointIntervalMsDoc) @@ -548,6 +552,7 @@ object KafkaConfig { replicaFetchMaxBytes = parsed.get(ReplicaFetchMaxBytesProp).asInstanceOf[Int], replicaFetchWaitMaxMs = parsed.get(ReplicaFetchWaitMaxMsProp).asInstanceOf[Int], replicaFetchMinBytes = parsed.get(ReplicaFetchMinBytesProp).asInstanceOf[Int], + replicaFetchBackoffMs = parsed.get(ReplicaFetchBackoffMsProp).asInstanceOf[Long], numReplicaFetchers = parsed.get(NumReplicaFetchersProp).asInstanceOf[Int], replicaHighWatermarkCheckpointIntervalMs = parsed.get(ReplicaHighWatermarkCheckpointIntervalMsProp).asInstanceOf[Long], fetchPurgatoryPurgeIntervalRequests = parsed.get(FetchPurgatoryPurgeIntervalRequestsProp).asInstanceOf[Int], @@ -688,6 +693,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val replicaFetchMaxBytes: Int = Defaults.ReplicaFetchMaxBytes, val replicaFetchWaitMaxMs: Int = Defaults.ReplicaFetchWaitMaxMs, val replicaFetchMinBytes: Int = Defaults.ReplicaFetchMinBytes, + val replicaFetchBackoffMs: Long = Defaults.ReplicaFetchBackoffMs, val numReplicaFetchers: Int = Defaults.NumReplicaFetchers, val replicaHighWatermarkCheckpointIntervalMs: Long = Defaults.ReplicaHighWatermarkCheckpointIntervalMs, val fetchPurgatoryPurgeIntervalRequests: Int = Defaults.FetchPurgatoryPurgeIntervalRequests, @@ -856,6 +862,7 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(ReplicaFetchMaxBytesProp, replicaFetchMaxBytes.toString) props.put(ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) props.put(ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString) + props.put(ReplicaFetchBackoffMsProp, replicaFetchBackoffMs.toString) props.put(NumReplicaFetchersProp, numReplicaFetchers.toString) props.put(ReplicaHighWatermarkCheckpointIntervalMsProp, replicaHighWatermarkCheckpointIntervalMs.toString) props.put(FetchPurgatoryPurgeIntervalRequestsProp, fetchPurgatoryPurgeIntervalRequests.toString) @@ -886,4 +893,4 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index d6d14fb..0662862 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -23,6 +23,7 @@ import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet import kafka.api.{OffsetRequest, FetchResponsePartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} +import java.util.concurrent.CountDownLatch class ReplicaFetcherThread(name:String, sourceBroker: Broker, @@ -37,6 +38,7 @@ class ReplicaFetcherThread(name:String, fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaFetchWaitMaxMs, minBytes = brokerConfig.replicaFetchMinBytes, + fetchBackOffMs = brokerConfig.replicaFetchBackoffMs, isInterruptible = false) { // process fetched data diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index c124c8d..191251d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -120,6 +120,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { Assert.assertEquals(expectedConfig.replicaFetchMaxBytes, actualConfig.replicaFetchMaxBytes) Assert.assertEquals(expectedConfig.replicaFetchWaitMaxMs, actualConfig.replicaFetchWaitMaxMs) Assert.assertEquals(expectedConfig.replicaFetchMinBytes, actualConfig.replicaFetchMinBytes) + Assert.assertEquals(expectedConfig.replicaFetchBackoffMs, actualConfig.replicaFetchBackoffMs) Assert.assertEquals(expectedConfig.numReplicaFetchers, actualConfig.numReplicaFetchers) Assert.assertEquals(expectedConfig.replicaHighWatermarkCheckpointIntervalMs, actualConfig.replicaHighWatermarkCheckpointIntervalMs) Assert.assertEquals(expectedConfig.fetchPurgatoryPurgeIntervalRequests, actualConfig.fetchPurgatoryPurgeIntervalRequests) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 52c7920..0ad1e2c 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -164,6 +164,8 @@ object TestUtils extends Logging { props.put("log.dir", TestUtils.tempDir().getAbsolutePath) props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) props.put("replica.socket.timeout.ms", "1500") + props.put("replica.fetch.backoff.ms", "1500") + props.put("controller.socket.timeout.ms", "1500") props.put("controlled.shutdown.enable", enableControlledShutdown.toString) props.put("delete.topic.enable", enableDeleteTopic.toString) props -- 1.9.3 (Apple Git-50)