From 5b3664376ea516d75b1a976cb922d22b5f2affd4 Mon Sep 17 00:00:00 2001 From: Andrii Biletskyi Date: Mon, 6 Jul 2015 17:16:46 +0300 Subject: [PATCH] KAFKA-2310 - Add config to prevent broker becoming controller --- .../scala/kafka/controller/KafkaController.scala | 12 +++++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 6 ++++ .../scala/kafka/server/NoOpLeaderElector.scala | 39 ++++++++++++++++++++++ .../kafka/server/KafkaConfigConfigDefTest.scala | 3 ++ 4 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 core/src/main/scala/kafka/server/NoOpLeaderElector.scala diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 3635057..498213c 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -155,8 +155,16 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) val partitionStateMachine = new PartitionStateMachine(this) val replicaStateMachine = new ReplicaStateMachine(this) - private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, - onControllerResignation, config.brokerId) + + private val controllerElector = + if (config.controllerEligibility) { + new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, + onControllerResignation, config.brokerId) + } else { + info("This broker is started in a non-controller mode - it will never be elected as controller") + new NoOpLeaderElector + } + // have a separate scheduler for the controller to be able to start and stop independently of the // kafka server private val autoRebalanceScheduler = new KafkaScheduler(1) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c1f0cca..25e1b26 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -44,6 +44,7 @@ object Defaults { val NumIoThreads = 8 val BackgroundThreads = 10 val QueuedMaxRequests = 500 + val ControllerEligibility = true /** ********* Socket Server Configuration ***********/ val Port = 9092 @@ -159,6 +160,7 @@ object KafkaConfig { val NumIoThreadsProp = "num.io.threads" val BackgroundThreadsProp = "background.threads" val QueuedMaxRequestsProp = "queued.max.requests" + val ControllerEligibilityProp = "controller.eligibility" /** ********* Socket Server Configuration ***********/ val PortProp = "port" val HostNameProp = "host.name" @@ -274,6 +276,8 @@ object KafkaConfig { val NumIoThreadsDoc = "The number of io threads that the server uses for carrying out network requests" val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks" val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads" + val ControllerEligibilityDoc = "Broker eligibility for being elected as controller. If set to false, this broker will " + + "never participate in leader election and thus won't serve as controller" /** ********* Socket Server Configuration ***********/ val PortDoc = "the port to listen and accept connections on" val HostNameDoc = "hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces" @@ -419,6 +423,7 @@ object KafkaConfig { .define(NumIoThreadsProp, INT, Defaults.NumIoThreads, atLeast(1), HIGH, NumIoThreadsDoc) .define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc) .define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc) + .define(ControllerEligibilityProp, BOOLEAN, Defaults.ControllerEligibility, MEDIUM, ControllerEligibilityDoc) /** ********* Socket Server Configuration ***********/ .define(PortProp, INT, Defaults.Port, HIGH, PortDoc) @@ -567,6 +572,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp) val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp) val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp) + val controllerEligibility = getBoolean(KafkaConfig.ControllerEligibilityProp) /** ********* Socket Server Configuration ***********/ val hostName = getString(KafkaConfig.HostNameProp) diff --git a/core/src/main/scala/kafka/server/NoOpLeaderElector.scala b/core/src/main/scala/kafka/server/NoOpLeaderElector.scala new file mode 100644 index 0000000..0235b5b --- /dev/null +++ b/core/src/main/scala/kafka/server/NoOpLeaderElector.scala @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.utils.Logging + +class NoOpLeaderElector extends LeaderElector with Logging { + override def startup: Unit = { + trace("Broker is started in non-controller mode - leader elector will take no actions upon startup") + } + + override def elect: Boolean = { + trace("Broker is started in non-controller mode - leader elector will take no actions upon election") + false + } + + override def close: Unit = { + trace("Broker is started in non-controller mode - leader elector will take no actions upon closing") + } + + override def amILeader: Boolean = { + trace("Broker is started in non-controller mode") + false + } +} diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 98a5b04..0fb0c0e 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -49,6 +49,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { Assert.assertEquals(expectedConfig.numIoThreads, actualConfig.numIoThreads) Assert.assertEquals(expectedConfig.backgroundThreads, actualConfig.backgroundThreads) Assert.assertEquals(expectedConfig.queuedMaxRequests, actualConfig.queuedMaxRequests) + Assert.assertEquals(expectedConfig.controllerEligibility, actualConfig.controllerEligibility) Assert.assertEquals(expectedConfig.port, actualConfig.port) Assert.assertEquals(expectedConfig.hostName, actualConfig.hostName) @@ -155,6 +156,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.NumIoThreadsProp => expected.setProperty(name, atLeastOneIntProp) case KafkaConfig.BackgroundThreadsProp => expected.setProperty(name, atLeastOneIntProp) case KafkaConfig.QueuedMaxRequestsProp => expected.setProperty(name, atLeastOneIntProp) + case KafkaConfig.ControllerEligibilityProp => expected.setProperty(name, randFrom("true", "false")) case KafkaConfig.PortProp => expected.setProperty(name, "1234") case KafkaConfig.HostNameProp => expected.setProperty(name, nextString(10)) @@ -263,6 +265,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.ControllerEligibilityProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.HostNameProp => // ignore string -- 1.9.1