From e3c653118e997ab60603387a069b59e5a10e522b Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sun, 19 Apr 2015 11:09:58 -0700 Subject: [PATCH] KAFKA-2122. Remove controller.message.queue.size Config. --- core/src/main/scala/kafka/controller/ControllerChannelManager.scala | 3 ++- core/src/main/scala/kafka/server/KafkaConfig.scala | 5 ----- core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala | 2 -- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 97acdb2..13f02c6 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -76,7 +76,8 @@ class ControllerChannelManager (private val controllerContext: ControllerContext } private def addNewBroker(broker: Broker) { - val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize) + // we are setting the messageQueue size to be Int.MaxValue , for more info refer KAFKA-1305 + val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](Int.MaxValue) debug("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id)) val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol) val channel = new BlockingChannel(brokerEndPoint.host, brokerEndPoint.port, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 69b772c..c83e494 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -196,7 +196,6 @@ object KafkaConfig { val MinInSyncReplicasProp = "min.insync.replicas" /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms" - val ControllerMessageQueueSizeProp = "controller.message.queue.size" val DefaultReplicationFactorProp = "default.replication.factor" val ReplicaLagTimeMaxMsProp = "replica.lag.time.max.ms" val ReplicaSocketTimeoutMsProp = "replica.socket.timeout.ms" @@ -444,7 +443,6 @@ object KafkaConfig { /** ********* Replication configuration ***********/ .define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc) - .define(ControllerMessageQueueSizeProp, INT, Defaults.ControllerMessageQueueSize, MEDIUM, ControllerMessageQueueSizeDoc) .define(DefaultReplicationFactorProp, INT, Defaults.DefaultReplicationFactor, MEDIUM, DefaultReplicationFactorDoc) .define(ReplicaLagTimeMaxMsProp, LONG, Defaults.ReplicaLagTimeMaxMs, HIGH, ReplicaLagTimeMaxMsDoc) .define(ReplicaSocketTimeoutMsProp, INT, Defaults.ReplicaSocketTimeoutMs, HIGH, ReplicaSocketTimeoutMsDoc) @@ -565,7 +563,6 @@ object KafkaConfig { /** ********* Replication configuration ***********/ controllerSocketTimeoutMs = parsed.get(ControllerSocketTimeoutMsProp).asInstanceOf[Int], - controllerMessageQueueSize = parsed.get(ControllerMessageQueueSizeProp).asInstanceOf[Int], defaultReplicationFactor = parsed.get(DefaultReplicationFactorProp).asInstanceOf[Int], replicaLagTimeMaxMs = parsed.get(ReplicaLagTimeMaxMsProp).asInstanceOf[Long], replicaSocketTimeoutMs = parsed.get(ReplicaSocketTimeoutMsProp).asInstanceOf[Int], @@ -708,7 +705,6 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ /** ********* Replication configuration ***********/ val controllerSocketTimeoutMs: Int = Defaults.ControllerSocketTimeoutMs, - val controllerMessageQueueSize: Int = Defaults.ControllerMessageQueueSize, val defaultReplicationFactor: Int = Defaults.DefaultReplicationFactor, val replicaLagTimeMaxMs: Long = Defaults.ReplicaLagTimeMaxMs, val replicaSocketTimeoutMs: Int = Defaults.ReplicaSocketTimeoutMs, @@ -924,7 +920,6 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ /** ********* Replication configuration ***********/ props.put(ControllerSocketTimeoutMsProp, controllerSocketTimeoutMs.toString) - props.put(ControllerMessageQueueSizeProp, controllerMessageQueueSize.toString) props.put(DefaultReplicationFactorProp, defaultReplicationFactor.toString) props.put(ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) props.put(ReplicaSocketTimeoutMsProp, replicaSocketTimeoutMs.toString) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 62d1832..8014a5a 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -112,7 +112,6 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { Assert.assertEquals(expectedConfig.minInSyncReplicas, actualConfig.minInSyncReplicas) Assert.assertEquals(expectedConfig.controllerSocketTimeoutMs, actualConfig.controllerSocketTimeoutMs) - Assert.assertEquals(expectedConfig.controllerMessageQueueSize, actualConfig.controllerMessageQueueSize) Assert.assertEquals(expectedConfig.defaultReplicationFactor, actualConfig.defaultReplicationFactor) Assert.assertEquals(expectedConfig.replicaLagTimeMaxMs, actualConfig.replicaLagTimeMaxMs) Assert.assertEquals(expectedConfig.replicaSocketTimeoutMs, actualConfig.replicaSocketTimeoutMs) @@ -313,7 +312,6 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ControllerMessageQueueSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") -- 1.9.5 (Apple Git-50.3)