From b57ac3c5c41b341de72e8e2a1c3abd377b7ebc3d Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sun, 19 Apr 2015 12:18:26 -0700 Subject: [PATCH 1/2] KAFKA-2122. Remove controller.message.queue.size Config. --- core/src/main/scala/kafka/controller/ControllerChannelManager.scala | 3 ++- core/src/main/scala/kafka/server/KafkaConfig.scala | 5 ----- core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala | 2 -- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 97acdb2..ef42561 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -76,7 +76,8 @@ class ControllerChannelManager (private val controllerContext: ControllerContext } private def addNewBroker(broker: Broker) { - val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize) + // we are setting the messageQueue size to be Int.MaxValue , for more info refer KAFKA-1305 + val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]() debug("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id)) val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol) val channel = new BlockingChannel(brokerEndPoint.host, brokerEndPoint.port, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index cfbbd2b..9efa15c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -196,7 +196,6 @@ object KafkaConfig { val MinInSyncReplicasProp = "min.insync.replicas" /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms" - val ControllerMessageQueueSizeProp = "controller.message.queue.size" val DefaultReplicationFactorProp = "default.replication.factor" val ReplicaLagTimeMaxMsProp = "replica.lag.time.max.ms" val ReplicaSocketTimeoutMsProp = "replica.socket.timeout.ms" @@ -444,7 +443,6 @@ object KafkaConfig { /** ********* Replication configuration ***********/ .define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc) - .define(ControllerMessageQueueSizeProp, INT, Defaults.ControllerMessageQueueSize, MEDIUM, ControllerMessageQueueSizeDoc) .define(DefaultReplicationFactorProp, INT, Defaults.DefaultReplicationFactor, MEDIUM, DefaultReplicationFactorDoc) .define(ReplicaLagTimeMaxMsProp, LONG, Defaults.ReplicaLagTimeMaxMs, HIGH, ReplicaLagTimeMaxMsDoc) .define(ReplicaSocketTimeoutMsProp, INT, Defaults.ReplicaSocketTimeoutMs, HIGH, ReplicaSocketTimeoutMsDoc) @@ -565,7 +563,6 @@ object KafkaConfig { /** ********* Replication configuration ***********/ controllerSocketTimeoutMs = parsed.get(ControllerSocketTimeoutMsProp).asInstanceOf[Int], - controllerMessageQueueSize = parsed.get(ControllerMessageQueueSizeProp).asInstanceOf[Int], defaultReplicationFactor = parsed.get(DefaultReplicationFactorProp).asInstanceOf[Int], replicaLagTimeMaxMs = parsed.get(ReplicaLagTimeMaxMsProp).asInstanceOf[Long], replicaSocketTimeoutMs = parsed.get(ReplicaSocketTimeoutMsProp).asInstanceOf[Int], @@ -708,7 +705,6 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ /** ********* Replication configuration ***********/ val controllerSocketTimeoutMs: Int = Defaults.ControllerSocketTimeoutMs, - val controllerMessageQueueSize: Int = Defaults.ControllerMessageQueueSize, val defaultReplicationFactor: Int = Defaults.DefaultReplicationFactor, val replicaLagTimeMaxMs: Long = Defaults.ReplicaLagTimeMaxMs, val replicaSocketTimeoutMs: Int = Defaults.ReplicaSocketTimeoutMs, @@ -930,7 +926,6 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ /** ********* Replication configuration ***********/ props.put(ControllerSocketTimeoutMsProp, controllerSocketTimeoutMs.toString) - props.put(ControllerMessageQueueSizeProp, controllerMessageQueueSize.toString) props.put(DefaultReplicationFactorProp, defaultReplicationFactor.toString) props.put(ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) props.put(ReplicaSocketTimeoutMsProp, replicaSocketTimeoutMs.toString) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 62d1832..8014a5a 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -112,7 +112,6 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { Assert.assertEquals(expectedConfig.minInSyncReplicas, actualConfig.minInSyncReplicas) Assert.assertEquals(expectedConfig.controllerSocketTimeoutMs, actualConfig.controllerSocketTimeoutMs) - Assert.assertEquals(expectedConfig.controllerMessageQueueSize, actualConfig.controllerMessageQueueSize) Assert.assertEquals(expectedConfig.defaultReplicationFactor, actualConfig.defaultReplicationFactor) Assert.assertEquals(expectedConfig.replicaLagTimeMaxMs, actualConfig.replicaLagTimeMaxMs) Assert.assertEquals(expectedConfig.replicaSocketTimeoutMs, actualConfig.replicaSocketTimeoutMs) @@ -313,7 +312,6 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ControllerMessageQueueSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") -- 1.9.5 (Apple Git-50.3) From d8d3e6de384bf01239a5c00b831e8cd448a2cb50 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sun, 19 Apr 2015 12:43:51 -0700 Subject: [PATCH 2/2] KAFKA-2122. Remove controller.message.queue.size Config. --- system_test/mirror_maker_testsuite/config/server.properties | 1 - system_test/offset_management_testsuite/config/server.properties | 1 - .../testcase_7002/config/kafka_server_1.properties | 1 - .../testcase_7002/config/kafka_server_2.properties | 1 - .../testcase_7002/config/kafka_server_3.properties | 1 - .../testcase_7002/config/kafka_server_4.properties | 1 - system_test/replication_testsuite/config/server.properties | 1 - 7 files changed, 7 deletions(-) diff --git a/system_test/mirror_maker_testsuite/config/server.properties b/system_test/mirror_maker_testsuite/config/server.properties index c628412..9717cd6 100644 --- a/system_test/mirror_maker_testsuite/config/server.properties +++ b/system_test/mirror_maker_testsuite/config/server.properties @@ -128,7 +128,6 @@ log.index.size.max.bytes=10485760 log.index.interval.bytes=4096 auto.create.topics.enable=true controller.socket.timeout.ms=30000 -controller.message.queue.size=10 default.replication.factor=1 replica.lag.time.max.ms=10000 replica.lag.max.messages=4000 diff --git a/system_test/offset_management_testsuite/config/server.properties b/system_test/offset_management_testsuite/config/server.properties index 2b988f8..b6de528 100644 --- a/system_test/offset_management_testsuite/config/server.properties +++ b/system_test/offset_management_testsuite/config/server.properties @@ -128,7 +128,6 @@ log.index.size.max.bytes=10485760 log.index.interval.bytes=4096 auto.create.topics.enable=true controller.socket.timeout.ms=30000 -controller.message.queue.size=10 default.replication.factor=1 replica.lag.time.max.ms=10000 replica.lag.max.messages=4000 diff --git a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties index 41ec6e4..9efbd9d 100644 --- a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties +++ b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties @@ -128,7 +128,6 @@ log.index.size.max.bytes=10485760 log.index.interval.bytes=4096 auto.create.topics.enable=true controller.socket.timeout.ms=30000 -controller.message.queue.size=10 default.replication.factor=3 replica.lag.time.max.ms=10000 replica.lag.max.messages=4000 diff --git a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties index 727e237..d4bf702 100644 --- a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties +++ b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties @@ -128,7 +128,6 @@ log.index.size.max.bytes=10485760 log.index.interval.bytes=4096 auto.create.topics.enable=true controller.socket.timeout.ms=30000 -controller.message.queue.size=10 default.replication.factor=3 replica.lag.time.max.ms=10000 replica.lag.max.messages=4000 diff --git a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties index e6fbbe1..e6e06be 100644 --- a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties +++ b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties @@ -128,7 +128,6 @@ log.index.size.max.bytes=10485760 log.index.interval.bytes=4096 auto.create.topics.enable=true controller.socket.timeout.ms=30000 -controller.message.queue.size=10 default.replication.factor=3 replica.lag.time.max.ms=10000 replica.lag.max.messages=4000 diff --git a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties index fee65bc..2cb03e4 100644 --- a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties +++ b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties @@ -128,7 +128,6 @@ log.index.size.max.bytes=10485760 log.index.interval.bytes=4096 auto.create.topics.enable=true controller.socket.timeout.ms=30000 -controller.message.queue.size=10 default.replication.factor=3 replica.lag.time.max.ms=10000 replica.lag.max.messages=4000 diff --git a/system_test/replication_testsuite/config/server.properties b/system_test/replication_testsuite/config/server.properties index 6becbab..d1dff68 100644 --- a/system_test/replication_testsuite/config/server.properties +++ b/system_test/replication_testsuite/config/server.properties @@ -128,7 +128,6 @@ log.index.size.max.bytes=10485760 log.index.interval.bytes=4096 auto.create.topics.enable=true controller.socket.timeout.ms=30000 -controller.message.queue.size=10 default.replication.factor=1 replica.lag.time.max.ms=10000 replica.lag.max.messages=4000 -- 1.9.5 (Apple Git-50.3)