From 5479e5d0e3f17c7f89a29fbac426f7a28c91e17a Mon Sep 17 00:00:00 2001
From: Andrii Biletskyi <andrii.biletskyi@stealth.ly>
Date: Mon, 6 Jul 2015 16:21:53 +0300
Subject: [PATCH] KAFKA-2310 - Add config to prevent broker becoming controller

---
 .../scala/kafka/controller/KafkaController.scala   | 12 +++++--
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 +++
 .../scala/kafka/server/NoOpLeaderElector.scala     | 40 ++++++++++++++++++++++
 3 files changed, 54 insertions(+), 2 deletions(-)
 create mode 100644 core/src/main/scala/kafka/server/NoOpLeaderElector.scala

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 66df6d2..93eba1c 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -158,8 +158,16 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
   val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
   val partitionStateMachine = new PartitionStateMachine(this)
   val replicaStateMachine = new ReplicaStateMachine(this)
-  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
-    onControllerResignation, config.brokerId)
+
+  private val controllerElector =
+    if (config.controllerEligibility) {
+      new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
+        onControllerResignation, config.brokerId)
+    } else {
+      info("This broker is started in a non-controller mode - it will never be elected as controller")
+      new NoOpLeaderElector
+    }
+
   // have a separate scheduler for the controller to be able to start and stop independently of the
   // kafka server
   private val autoRebalanceScheduler = new KafkaScheduler(1)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index e5fecae..ea4be55 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -89,6 +89,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the number of queued requests allowed before blocking the network threads */
   val queuedMaxRequests = props.getIntInRange("queued.max.requests", 500, (1, Int.MaxValue))
 
+  /* Broker eligibility for being elected as controller. If set to false, this broker will never
+  participate in leader election and thus won't serve as controller" */
+  val controllerEligibility = props.getBoolean("controller.eligibility", true)
+
   /*********** Socket Server Configuration ***********/
 
   /* the port to listen and accept connections on */
diff --git a/core/src/main/scala/kafka/server/NoOpLeaderElector.scala b/core/src/main/scala/kafka/server/NoOpLeaderElector.scala
new file mode 100644
index 0000000..45a6e0a
--- /dev/null
+++ b/core/src/main/scala/kafka/server/NoOpLeaderElector.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+
+class NoOpLeaderElector extends LeaderElector with Logging {
+  override def startup: Unit = {
+    trace("Broker is started in non-controller mode - leader elector will take no actions upon startup")
+  }
+
+  override def elect: Boolean = {
+    trace("Broker is started in non-controller mode - leader elector will take no actions upon election")
+    false
+  }
+
+  override def close: Unit = {
+    trace("Broker is started in non-controller mode - leader elector will take no actions upon closing")
+  }
+
+  override def amILeader: Boolean = {
+    trace("Broker is started in non-controller mode")
+    false
+  }
+}
\ No newline at end of file
-- 
1.9.1

