diff --git a/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala b/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala index a1e1279..b66c8fc 100644 --- a/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala +++ b/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala @@ -20,7 +20,8 @@ package kafka.common /** * This exception is thrown by the leader elector in the controller when leader election fails for a partition since - * all the replicas for a partition are offline + * all the leader candidate replicas for a partition are offline; the set of candidates may or may not be limited + * to just the in sync replicas depending upon whether unclean leader election is allowed to occur. */ class NoReplicaOnlineException(message: String, cause: Throwable) extends RuntimeException(message, cause) { def this(message: String) = this(message, null) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a0267ae..e2b1812 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -135,7 +135,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // have a separate scheduler for the controller to be able to start and stop independently of the // kafka server private val autoRebalanceScheduler = new KafkaScheduler(1) - val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) + val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index fd9200f..1528764 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -19,6 +19,7 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.utils.Logging import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} +import kafka.server.KafkaConfig trait PartitionLeaderSelector { @@ -37,12 +38,14 @@ trait PartitionLeaderSelector { * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest): * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live * isr as the new isr. - * 2. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr. - * 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException + * 2. Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException. + * 3. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr. + * 4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException * Replicas to receive LeaderAndIsr request = live assigned replicas * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache */ -class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { +class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig) + extends PartitionLeaderSelector with Logging { this.logIdent = "[OfflinePartitionLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { @@ -54,6 +57,14 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { case true => + // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the corresponding + // configuration settings. + if (!config.isUncleanElectionEnabled(topicAndPartition.topic)) { + throw new NoReplicaOnlineException(("No broker in ISR for partition " + + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + + " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) + } + debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" .format(topicAndPartition, liveAssignedReplicasToThisPartition.mkString(","))) liveAssignedReplicasToThisPartition.isEmpty match { @@ -77,7 +88,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) (newLeaderAndIsr, liveAssignedReplicasToThisPartition) case None => - throw new NoReplicaOnlineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it") + throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition)) } } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3c3aafc..0b60ae9 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -231,6 +231,30 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency with which the partition rebalance check is triggered by the controller */ val leaderImbalanceCheckIntervalSeconds = props.getInt("leader.imbalance.check.interval.seconds", 300) + /* indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though + * doing so may result in data loss */ + val uncleanLeaderElectionEnable = props.getBoolean("replica.unclean.election.enable", true) + + /* regular expression specifying override topics for which unclean leader election is enabled */ + val uncleanLeaderElectionEnabledTopics = props.getString("replica.unclean.election.topics", + if(uncleanLeaderElectionEnable) ".*" else "").r + + /* regular expression specifying override topics for which unclean leader election is disabled */ + val uncleanLeaderElectionDisabledTopics = props.getString("replica.clean.election.topics", + if(uncleanLeaderElectionEnable) "" else ".*").r + + /** + * Query unclean election permissibility for a topic. + * + * @param topic The topic for which unclean election is being considered + * @return true if unclean election is permitted for the topic, or false otherwise + */ + def isUncleanElectionEnabled(topic: String): Boolean = { + uncleanLeaderElectionEnable match { + case true => !uncleanLeaderElectionDisabledTopics.pattern.matcher(topic).matches() + case false => uncleanLeaderElectionEnabledTopics.pattern.matcher(topic).matches() + } + } /*********** Controlled shutdown configuration ***********/ diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 73e605e..029cff1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -81,9 +81,19 @@ class ReplicaFetcherThread(name:String, */ val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) if (leaderEndOffset < replica.logEndOffset) { + if (!brokerConfig.isUncleanElectionEnabled(topicAndPartition.topic)) { + // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. + // Otherwise, we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker + // configuration. Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. + fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" + .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset)) + Runtime.getRuntime.halt(1) + } + replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset)) - warn("Replica %d for partition %s reset its fetch offset to current leader %d's latest offset %d" - .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderEndOffset)) + warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" + .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderEndOffset)) leaderEndOffset } else { /** @@ -94,8 +104,8 @@ class ReplicaFetcherThread(name:String, */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) - warn("Replica %d for partition %s reset its fetch offset to current leader %d's start offset %d" - .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderStartOffset)) + warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" + .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderStartOffset)) leaderStartOffset } } diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index b585f0e..e86ee80 100644 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.server +package kafka.integration import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness @@ -27,6 +27,7 @@ import kafka.cluster.Broker import kafka.common.ErrorMapping import kafka.api._ import kafka.admin.AdminUtils +import kafka.server.{KafkaConfig, KafkaServer} class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { val brokerId1 = 0 diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala new file mode 100644 index 0000000..4b7a6cd --- /dev/null +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.integration + +import scala.collection.mutable.MutableList +import scala.util.Random +import org.apache.log4j.{Level, Logger} +import org.scalatest.junit.JUnit3Suite +import java.util.Properties +import junit.framework.Assert._ +import kafka.admin.AdminUtils +import kafka.common.FailedToSendMessageException +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException} +import kafka.producer.{KeyedMessage, Producer} +import kafka.serializer.{DefaultEncoder, StringEncoder} +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.Utils +import kafka.utils.TestUtils._ +import kafka.zk.ZooKeeperTestHarness + +class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { + val brokerId1 = 0 + val brokerId2 = 1 + + val port1 = choosePort() + val port2 = choosePort() + + // controlled shutdown is needed for these tests, but we can trim the retry count and backoff interval to + // reduce test execution time + val enableControlledShutdown = true + val configProps1 = createBrokerConfig(brokerId1, port1) + val configProps2 = createBrokerConfig(brokerId2, port2) + + for (configProps <- List(configProps1, configProps2)) { + configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown)) + configProps.put("controlled.shutdown.max.retries", String.valueOf(1)) + configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000)) + } + + var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig] + var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + + val random = new Random() + val topic = "topic" + random.nextLong + val partitionId = 0 + + val kafkaApisLogger = Logger.getLogger(classOf[kafka.server.KafkaApis]) + val networkProcessorLogger = Logger.getLogger(classOf[kafka.network.Processor]) + val syncProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer]) + val eventHandlerLogger = Logger.getLogger(classOf[kafka.producer.async.DefaultEventHandler[Object, Object]]) + + override def setUp() { + super.setUp() + + // temporarily set loggers to a higher level so that tests run quietly + kafkaApisLogger.setLevel(Level.FATAL) + networkProcessorLogger.setLevel(Level.FATAL) + syncProducerLogger.setLevel(Level.FATAL) + eventHandlerLogger.setLevel(Level.FATAL) + } + + override def tearDown() { + servers.map(server => server.shutdown()) + servers.map(server => Utils.rm(server.config.logDirs)) + + // restore log levels + kafkaApisLogger.setLevel(Level.ERROR) + networkProcessorLogger.setLevel(Level.ERROR) + syncProducerLogger.setLevel(Level.ERROR) + eventHandlerLogger.setLevel(Level.ERROR) + + super.tearDown() + } + + private def startBrokers(cluster: Seq[Properties]) { + for (props <- cluster) { + val config = new KafkaConfig(props) + val server = createServer(config) + configs ++= List(config) + servers ++= List(server) + } + } + + def testUncleanLeaderElectionEnabled { + // unclean leader election is enabled by default + startBrokers(Seq(configProps1, configProps2)) + + // create topic with 1 partition, 2 replicas, one on each broker + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) + + // wait until leader is elected + val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000) + assertTrue("Leader should get elected", leaderIdOpt.isDefined) + val leaderId = leaderIdOpt.get + debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) + assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) + + // the non-leader broker is the follower + val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 + debug("Follower for " + topic + " is: %s".format(followerId)) + + produceMessage(topic, "first") + waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) + assertEquals(List("first"), consumeAllMessages(topic)) + + // shutdown follower server + servers.filter(server => server.config.brokerId == followerId).map(server => server.shutdown()) + + produceMessage(topic, "second") + assertEquals(List("first", "second"), consumeAllMessages(topic)) + + // shutdown leader and then restart follower + servers.filter(server => server.config.brokerId == leaderId).map(server => server.shutdown()) + servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) + + // wait until new leader is (uncleanly) elected + val newLeaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(leaderId)) + assertTrue("New leader should get elected", newLeaderIdOpt.isDefined) + assertEquals(followerId, newLeaderIdOpt.get) + + produceMessage(topic, "third") + + // second message was lost due to unclean election + assertEquals(List("first", "third"), consumeAllMessages(topic)) + } + + def testUncleanLeaderElectionDisabled { + // disable unclean leader election + if (configProps1.getProperty("replica.unclean.election.enable") == null) + configProps1.put("replica.unclean.election.enable", String.valueOf(false)) + if (configProps2.getProperty("replica.unclean.election.enable") == null) + configProps2.put("replica.unclean.election.enable", String.valueOf(false)) + startBrokers(Seq(configProps1, configProps2)) + + // create topic with 1 partition, 2 replicas, one on each broker + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) + + // wait until leader is elected + val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000) + assertTrue("Leader should get elected", leaderIdOpt.isDefined) + val leaderId = leaderIdOpt.get + debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) + assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) + + // the non-leader broker is the follower + val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 + debug("Follower for " + topic + " is: %s".format(followerId)) + + produceMessage(topic, "first") + waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) + assertEquals(List("first"), consumeAllMessages(topic)) + + // shutdown follower server + servers.filter(server => server.config.brokerId == followerId).map(server => server.shutdown()) + + produceMessage(topic, "second") + assertEquals(List("first", "second"), consumeAllMessages(topic)) + + // shutdown leader and then restart follower + servers.filter(server => server.config.brokerId == leaderId).map(server => server.shutdown()) + servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) + + // verify that unclean election to non-ISR follower does not occur + val newLeaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(leaderId)) + assertTrue("Leader should still be elected", newLeaderIdOpt.isDefined) + assertEquals(leaderId, newLeaderIdOpt.get) + + // message production and consumption should both fail while leader is down + intercept[FailedToSendMessageException] { + produceMessage(topic, "third") + } + assertEquals(List.empty[String], consumeAllMessages(topic)) + + // restart leader temporarily to send a successfully replicated message + servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup()) + produceMessage(topic, "third") + waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) + servers.filter(server => server.config.brokerId == leaderId).map(server => server.shutdown()) + + // verify clean leader transition to ISR follower + val newLeaderIdOpt2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(leaderId)) + assertTrue("New leader should be elected", newLeaderIdOpt2.isDefined) + assertEquals(followerId, newLeaderIdOpt2.get) + + // verify messages can be consumed from ISR follower that was just promoted to leader + assertEquals(List("first", "second", "third"), consumeAllMessages(topic)) + } + + def testUncleanLeaderElectionEnabledByTopicOverride { + // disable unclean leader election globally, but enable for our specific test topic + configProps1.put("replica.unclean.election.enable", String.valueOf(false)) + configProps2.put("replica.unclean.election.enable", String.valueOf(false)) + configProps2.put("replica.unclean.election.topics", topic) + testUncleanLeaderElectionEnabled + } + + def testCleanLeaderElectionDisabledByTopicOverride { + // enable unclean leader election globally, but disable for our specific test topic + configProps1.put("replica.unclean.election.enable", String.valueOf(true)) + configProps2.put("replica.unclean.election.enable", String.valueOf(true)) + configProps2.put("replica.clean.election.topics", topic) + testUncleanLeaderElectionDisabled + } + + private def produceMessage(topic: String, message: String) = { + val props = new Properties() + props.put("request.required.acks", String.valueOf(-1)) + val producer: Producer[String, Array[Byte]] = createProducer(getBrokerListStrFromConfigs(configs), + new DefaultEncoder(), new StringEncoder(), props) + producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes)) + producer.close() + } + + private def consumeAllMessages(topic: String) : List[String] = { + // use a fresh consumer group every time so that we don't need to mess with disabling auto-commit or + // resetting the ZK offset + val consumerProps = createConsumerProperties(zkConnect, "group" + random.nextLong, "id", 1000) + val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps)) + val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head + + val messages = new MutableList[String] + val iter = messageStream.iterator + try { + while(iter.hasNext()) { + messages += new String(iter.next.message) // will throw a timeout exception if the message isn't there + } + } catch { + case e: ConsumerTimeoutException => + debug("consumer timed out after receiving " + messages.length + " message(s).") + } finally { + consumerConnector.shutdown + } + messages.toList + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 89c207a..2da8480 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -93,5 +93,99 @@ class KafkaConfigTest extends JUnit3Suite { assertEquals(serverConfig.advertisedHostName, advertisedHostName) assertEquals(serverConfig.advertisedPort, advertisedPort) } - + + @Test + def testUncleanElectionDefaults() { + val props = TestUtils.createBrokerConfig(0, 8181) + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.uncleanLeaderElectionEnable, true) + assertEquals(serverConfig.uncleanLeaderElectionEnabledTopics.pattern.pattern(), ".*") + assertEquals(serverConfig.uncleanLeaderElectionDisabledTopics.pattern.pattern(), "") + assertEquals(serverConfig.isUncleanElectionEnabled("some.topic"), true) + } + + @Test + def testUncleanElectionDisabled() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("replica.unclean.election.enable", String.valueOf(false)) + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.uncleanLeaderElectionEnable, false) + assertEquals(serverConfig.uncleanLeaderElectionEnabledTopics.pattern.pattern(), "") + assertEquals(serverConfig.uncleanLeaderElectionDisabledTopics.pattern.pattern(), ".*") + assertEquals(serverConfig.isUncleanElectionEnabled("some.topic"), false) + } + + @Test + def testUncleanElectionEnabled() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("replica.unclean.election.enable", String.valueOf(true)) + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.uncleanLeaderElectionEnable, true) + assertEquals(serverConfig.uncleanLeaderElectionEnabledTopics.pattern.pattern(), ".*") + assertEquals(serverConfig.uncleanLeaderElectionDisabledTopics.pattern.pattern(), "") + assertEquals(serverConfig.isUncleanElectionEnabled("some.topic"), true) + } + + @Test + def testUncleanElectionDisabledWithOverrideTopics() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("replica.unclean.election.enable", String.valueOf(false)) + props.put("replica.unclean.election.topics", "ENABLE_.*") + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.uncleanLeaderElectionEnable, false) + assertEquals(serverConfig.uncleanLeaderElectionEnabledTopics.pattern.pattern(), "ENABLE_.*") + assertEquals(serverConfig.uncleanLeaderElectionDisabledTopics.pattern.pattern(), ".*") + assertEquals(serverConfig.isUncleanElectionEnabled("some.topic"), false) + assertEquals(serverConfig.isUncleanElectionEnabled("ENABLE_UNCLEAN"), true) + } + + @Test + def testUncleanElectionEnabledWithOverrideTopics() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("replica.unclean.election.enable", String.valueOf(true)) + props.put("replica.clean.election.topics", "DISABLE_.*") + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.uncleanLeaderElectionEnable, true) + assertEquals(serverConfig.uncleanLeaderElectionEnabledTopics.pattern.pattern(), ".*") + assertEquals(serverConfig.uncleanLeaderElectionDisabledTopics.pattern.pattern(), "DISABLE_.*") + assertEquals(serverConfig.isUncleanElectionEnabled("some.topic"), true) + assertEquals(serverConfig.isUncleanElectionEnabled("DISABLE_UNCLEAN"), false) + } + + @Test + def testUncleanElectionInvalid() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("replica.unclean.election.enable", "invalid") + + intercept[IllegalArgumentException] { + new KafkaConfig(props) + } + } + + @Test + def testUncleanElectionInvalidDisableOverrideTopics() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("replica.unclean.election.enable", String.valueOf(true)) + props.put("replica.clean.election.topics", "[invalid") + + intercept[IllegalArgumentException] { + new KafkaConfig(props) + } + } + + @Test + def testUncleanElectionInvalidEnableOverrideTopics() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("replica.unclean.election.enable", String.valueOf(false)) + props.put("replica.unclean.election.topics", "[invalid") + + intercept[IllegalArgumentException] { + new KafkaConfig(props) + } + } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index d88b6c3..c017100 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -293,8 +293,8 @@ object TestUtils extends Logging { */ def createProducer[K, V](brokerList: String, encoder: Encoder[V] = new DefaultEncoder(), - keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = { - val props = new Properties() + keyEncoder: Encoder[K] = new DefaultEncoder(), + props: Properties = new Properties()): Producer[K, V] = { props.put("metadata.broker.list", brokerList) props.put("send.buffer.bytes", "65536") props.put("connect.timeout.ms", "100000")