diff --git a/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala b/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala index a1e1279..b66c8fc 100644 --- a/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala +++ b/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala @@ -20,7 +20,8 @@ package kafka.common /** * This exception is thrown by the leader elector in the controller when leader election fails for a partition since - * all the replicas for a partition are offline + * all the leader candidate replicas for a partition are offline; the set of candidates may or may not be limited + * to just the in sync replicas depending upon whether unclean leader election is allowed to occur. */ class NoReplicaOnlineException(message: String, cause: Throwable) extends RuntimeException(message, cause) { def this(message: String) = this(message, null) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 5db24a7..7dc2718 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -21,10 +21,12 @@ import collection.Set import com.yammer.metrics.core.Gauge import java.lang.{IllegalStateException, Object} import java.util.concurrent.TimeUnit +import kafka.admin.AdminUtils import kafka.admin.PreferredReplicaLeaderElectionCommand import kafka.api._ import kafka.cluster.Broker import kafka.common._ +import kafka.log.LogConfig import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import kafka.server.{ZookeeperLeaderElector, KafkaConfig} import kafka.utils.ZkUtils._ @@ -164,7 +166,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // kafka server private val autoRebalanceScheduler = new KafkaScheduler(1) var deleteTopicManager: TopicDeletionManager = null - val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) + val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) @@ -972,8 +974,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg if (leaderAndIsr.isr.contains(replicaId)) { // if the replica to be removed from the ISR is also the leader, set the new leader value to -1 val newLeader = if(replicaId == leaderAndIsr.leader) -1 else leaderAndIsr.leader + var newIsr = leaderAndIsr.isr.filter(b => b != replicaId) + + // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election + // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can + // eventually be restored as the leader. + if (newIsr.isEmpty && !LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(zkClient, + topicAndPartition.topic)).uncleanLeaderElectionEnable) { + info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition)) + newIsr = leaderAndIsr.isr + } + val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1, - leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1) + newIsr, leaderAndIsr.zkVersion + 1) // update the new leadership decision in zookeeper or retry val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( zkClient, diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index fa29bbe..d3b25fa 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -16,9 +16,12 @@ */ package kafka.controller +import kafka.admin.AdminUtils import kafka.api.LeaderAndIsr +import kafka.log.LogConfig import kafka.utils.Logging import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} +import kafka.server.KafkaConfig trait PartitionLeaderSelector { @@ -37,12 +40,14 @@ trait PartitionLeaderSelector { * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest): * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live * isr as the new isr. - * 2. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr. - * 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException + * 2. Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException. + * 3. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr. + * 4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException * Replicas to receive LeaderAndIsr request = live assigned replicas * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache */ -class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { +class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig) + extends PartitionLeaderSelector with Logging { this.logIdent = "[OfflinePartitionLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { @@ -54,6 +59,15 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { case true => + // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration + // for unclean leader election. + if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient, + topicAndPartition.topic)).uncleanLeaderElectionEnable) { + throw new NoReplicaOnlineException(("No broker in ISR for partition " + + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + + " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) + } + debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" .format(topicAndPartition, liveAssignedReplicas.mkString(","))) liveAssignedReplicas.isEmpty match { @@ -77,7 +91,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) (newLeaderAndIsr, liveAssignedReplicas) case None => - throw new NoReplicaOnlineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it") + throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition)) } } } diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 18c86fe..5746ad4 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -21,6 +21,23 @@ import java.util.Properties import scala.collection._ import kafka.common._ +object Defaults { + val SegmentSize = 1024 * 1024 + val SegmentMs = Long.MaxValue + val FlushInterval = Long.MaxValue + val FlushMs = Long.MaxValue + val RetentionSize = Long.MaxValue + val RetentionMs = Long.MaxValue + val MaxMessageSize = Int.MaxValue + val MaxIndexSize = 1024 * 1024 + val IndexInterval = 4096 + val FileDeleteDelayMs = 60 * 1000L + val DeleteRetentionMs = 24 * 60 * 60 * 1000L + val MinCleanableDirtyRatio = 0.5 + val Compact = false + val UncleanLeaderElectionEnable = true +} + /** * Configuration settings for a log * @param segmentSize The soft maximum for the size of a segment file in the log @@ -35,20 +52,23 @@ import kafka.common._ * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted. * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned * @param compact Should old segments in this log be deleted or deduplicated? + * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property + * but included here for topic-specific configuration validation purposes */ -case class LogConfig(val segmentSize: Int = 1024*1024, - val segmentMs: Long = Long.MaxValue, - val flushInterval: Long = Long.MaxValue, - val flushMs: Long = Long.MaxValue, - val retentionSize: Long = Long.MaxValue, - val retentionMs: Long = Long.MaxValue, - val maxMessageSize: Int = Int.MaxValue, - val maxIndexSize: Int = 1024*1024, - val indexInterval: Int = 4096, - val fileDeleteDelayMs: Long = 60*1000, - val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L, - val minCleanableRatio: Double = 0.5, - val compact: Boolean = false) { +case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, + val segmentMs: Long = Defaults.SegmentMs, + val flushInterval: Long = Defaults.FlushInterval, + val flushMs: Long = Defaults.FlushMs, + val retentionSize: Long = Defaults.RetentionSize, + val retentionMs: Long = Defaults.RetentionMs, + val maxMessageSize: Int = Defaults.MaxMessageSize, + val maxIndexSize: Int = Defaults.MaxIndexSize, + val indexInterval: Int = Defaults.IndexInterval, + val fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs, + val deleteRetentionMs: Long = Defaults.DeleteRetentionMs, + val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, + val compact: Boolean = Defaults.Compact, + val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable) { def toProps: Properties = { val props = new Properties() @@ -66,6 +86,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024, props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString) props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) props.put(CleanupPolicyProp, if(compact) "compact" else "delete") + props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) props } @@ -85,6 +106,7 @@ object LogConfig { val FileDeleteDelayMsProp = "file.delete.delay.ms" val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" val CleanupPolicyProp = "cleanup.policy" + val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" val ConfigNames = Set(SegmentBytesProp, SegmentMsProp, @@ -98,26 +120,31 @@ object LogConfig { FileDeleteDelayMsProp, DeleteRetentionMsProp, MinCleanableDirtyRatioProp, - CleanupPolicyProp) + CleanupPolicyProp, + UncleanLeaderElectionEnableProp) /** * Parse the given properties instance into a LogConfig object */ def fromProps(props: Properties): LogConfig = { - new LogConfig(segmentSize = props.getProperty(SegmentBytesProp).toInt, - segmentMs = props.getProperty(SegmentMsProp).toLong, - maxIndexSize = props.getProperty(SegmentIndexBytesProp).toInt, - flushInterval = props.getProperty(FlushMessagesProp).toLong, - flushMs = props.getProperty(FlushMsProp).toLong, - retentionSize = props.getProperty(RetentionBytesProp).toLong, - retentionMs = props.getProperty(RententionMsProp).toLong, - maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt, - indexInterval = props.getProperty(IndexIntervalBytesProp).toInt, - fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt, - deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong, - minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble, - compact = props.getProperty(CleanupPolicyProp).trim.toLowerCase != "delete") + new LogConfig(segmentSize = props.getProperty(SegmentBytesProp, Defaults.SegmentSize.toString).toInt, + segmentMs = props.getProperty(SegmentMsProp, Defaults.SegmentMs.toString).toLong, + maxIndexSize = props.getProperty(SegmentIndexBytesProp, Defaults.MaxIndexSize.toString).toInt, + flushInterval = props.getProperty(FlushMessagesProp, Defaults.FlushInterval.toString).toLong, + flushMs = props.getProperty(FlushMsProp, Defaults.FlushMs.toString).toLong, + retentionSize = props.getProperty(RetentionBytesProp, Defaults.RetentionSize.toString).toLong, + retentionMs = props.getProperty(RententionMsProp, Defaults.RetentionMs.toString).toLong, + maxMessageSize = props.getProperty(MaxMessageBytesProp, Defaults.MaxMessageSize.toString).toInt, + indexInterval = props.getProperty(IndexIntervalBytesProp, Defaults.IndexInterval.toString).toInt, + fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString).toInt, + deleteRetentionMs = props.getProperty(DeleteRetentionMsProp, Defaults.DeleteRetentionMs.toString).toLong, + minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp, + Defaults.MinCleanableDirtyRatio.toString).toDouble, + compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete") + .trim.toLowerCase != "delete", + uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp, + Defaults.UncleanLeaderElectionEnable.toString).toBoolean) } /** diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d07796e..08de0ef 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -231,6 +231,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency with which the partition rebalance check is triggered by the controller */ val leaderImbalanceCheckIntervalSeconds = props.getInt("leader.imbalance.check.interval.seconds", 300) + /* indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though + * doing so may result in data loss */ + val uncleanLeaderElectionEnable = props.getBoolean("unclean.leader.election.enable", true) /*********** Controlled shutdown configuration ***********/ diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 73e605e..75ae1e1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -17,7 +17,9 @@ package kafka.server +import kafka.admin.AdminUtils import kafka.cluster.Broker +import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet import kafka.api.{OffsetRequest, FetchResponsePartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} @@ -81,9 +83,21 @@ class ReplicaFetcherThread(name:String, */ val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) if (leaderEndOffset < replica.logEndOffset) { + // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. + // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, + // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. + if (!LogConfig.fromProps(brokerConfig.props.props, AdminUtils.fetchTopicConfig(replicaMgr.zkClient, + topicAndPartition.topic)).uncleanLeaderElectionEnable) { + // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. + fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" + .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset)) + Runtime.getRuntime.halt(1) + } + replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset)) - warn("Replica %d for partition %s reset its fetch offset to current leader %d's latest offset %d" - .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderEndOffset)) + warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" + .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderEndOffset)) leaderEndOffset } else { /** @@ -94,8 +108,8 @@ class ReplicaFetcherThread(name:String, */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) - warn("Replica %d for partition %s reset its fetch offset to current leader %d's start offset %d" - .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderStartOffset)) + warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" + .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderStartOffset)) leaderStartOffset } } diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index b585f0e..e86ee80 100644 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.server +package kafka.integration import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness @@ -27,6 +27,7 @@ import kafka.cluster.Broker import kafka.common.ErrorMapping import kafka.api._ import kafka.admin.AdminUtils +import kafka.server.{KafkaConfig, KafkaServer} class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { val brokerId1 = 0 diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala new file mode 100644 index 0000000..a38d293 --- /dev/null +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.integration + +import scala.collection.mutable.MutableList +import scala.util.Random +import org.apache.log4j.{Level, Logger} +import org.scalatest.junit.JUnit3Suite +import java.util.Properties +import junit.framework.Assert._ +import kafka.admin.AdminUtils +import kafka.common.FailedToSendMessageException +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException} +import kafka.producer.{KeyedMessage, Producer} +import kafka.serializer.{DefaultEncoder, StringEncoder} +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.Utils +import kafka.utils.TestUtils._ +import kafka.zk.ZooKeeperTestHarness + +class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { + val brokerId1 = 0 + val brokerId2 = 1 + + val port1 = choosePort() + val port2 = choosePort() + + // controlled shutdown is needed for these tests, but we can trim the retry count and backoff interval to + // reduce test execution time + val enableControlledShutdown = true + val configProps1 = createBrokerConfig(brokerId1, port1) + val configProps2 = createBrokerConfig(brokerId2, port2) + + for (configProps <- List(configProps1, configProps2)) { + configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown)) + configProps.put("controlled.shutdown.max.retries", String.valueOf(1)) + configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000)) + } + + var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig] + var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + + val random = new Random() + val topic = "topic" + random.nextLong + val partitionId = 0 + + val kafkaApisLogger = Logger.getLogger(classOf[kafka.server.KafkaApis]) + val networkProcessorLogger = Logger.getLogger(classOf[kafka.network.Processor]) + val syncProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer]) + val eventHandlerLogger = Logger.getLogger(classOf[kafka.producer.async.DefaultEventHandler[Object, Object]]) + + override def setUp() { + super.setUp() + + // temporarily set loggers to a higher level so that tests run quietly + kafkaApisLogger.setLevel(Level.FATAL) + networkProcessorLogger.setLevel(Level.FATAL) + syncProducerLogger.setLevel(Level.FATAL) + eventHandlerLogger.setLevel(Level.FATAL) + } + + override def tearDown() { + servers.map(server => shutdownServer(server)) + servers.map(server => Utils.rm(server.config.logDirs)) + + // restore log levels + kafkaApisLogger.setLevel(Level.ERROR) + networkProcessorLogger.setLevel(Level.ERROR) + syncProducerLogger.setLevel(Level.ERROR) + eventHandlerLogger.setLevel(Level.ERROR) + + super.tearDown() + } + + private def startBrokers(cluster: Seq[Properties]) { + for (props <- cluster) { + val config = new KafkaConfig(props) + val server = createServer(config) + configs ++= List(config) + servers ++= List(server) + } + } + + def testUncleanLeaderElectionEnabled { + // unclean leader election is enabled by default + startBrokers(Seq(configProps1, configProps2)) + + // create topic with 1 partition, 2 replicas, one on each broker + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) + + verifyUncleanLeaderElectionEnabled + } + + def testUncleanLeaderElectionDisabled { + // disable unclean leader election + configProps1.put("unclean.leader.election.enable", String.valueOf(false)) + configProps2.put("unclean.leader.election.enable", String.valueOf(false)) + startBrokers(Seq(configProps1, configProps2)) + + // create topic with 1 partition, 2 replicas, one on each broker + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) + + verifyUncleanLeaderElectionDisabled + } + + def testUncleanLeaderElectionEnabledByTopicOverride { + // disable unclean leader election globally, but enable for our specific test topic + configProps1.put("unclean.leader.election.enable", String.valueOf(false)) + configProps2.put("unclean.leader.election.enable", String.valueOf(false)) + startBrokers(Seq(configProps1, configProps2)) + + // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled + val topicProps = new Properties() + topicProps.put("unclean.leader.election.enable", String.valueOf(true)) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), + topicProps) + + verifyUncleanLeaderElectionEnabled + } + + def testCleanLeaderElectionDisabledByTopicOverride { + // enable unclean leader election globally, but disable for our specific test topic + configProps1.put("unclean.leader.election.enable", String.valueOf(true)) + configProps2.put("unclean.leader.election.enable", String.valueOf(true)) + startBrokers(Seq(configProps1, configProps2)) + + // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled + val topicProps = new Properties() + topicProps.put("unclean.leader.election.enable", String.valueOf(false)) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), + topicProps) + + verifyUncleanLeaderElectionDisabled + } + + def testUncleanLeaderElectionInvalidTopicOverride { + startBrokers(Seq(configProps1)) + + // create topic with an invalid value for unclean leader election + val topicProps = new Properties() + topicProps.put("unclean.leader.election.enable", "invalid") + + intercept[IllegalArgumentException] { + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1)), topicProps) + } + } + + def verifyUncleanLeaderElectionEnabled { + // wait until leader is elected + val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000) + assertTrue("Leader should get elected", leaderIdOpt.isDefined) + val leaderId = leaderIdOpt.get + debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) + assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) + + // the non-leader broker is the follower + val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 + debug("Follower for " + topic + " is: %s".format(followerId)) + + produceMessage(topic, "first") + waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) + assertEquals(List("first"), consumeAllMessages(topic)) + + // shutdown follower server + servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) + + produceMessage(topic, "second") + assertEquals(List("first", "second"), consumeAllMessages(topic)) + + // shutdown leader and then restart follower + servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) + servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) + + // wait until new leader is (uncleanly) elected + val newLeaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(leaderId)) + assertTrue("New leader should get elected", newLeaderIdOpt.isDefined) + assertEquals(followerId, newLeaderIdOpt.get) + + produceMessage(topic, "third") + + // second message was lost due to unclean election + assertEquals(List("first", "third"), consumeAllMessages(topic)) + } + + def verifyUncleanLeaderElectionDisabled { + // wait until leader is elected + val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000) + assertTrue("Leader should get elected", leaderIdOpt.isDefined) + val leaderId = leaderIdOpt.get + debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) + assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) + + // the non-leader broker is the follower + val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 + debug("Follower for " + topic + " is: %s".format(followerId)) + + produceMessage(topic, "first") + waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) + assertEquals(List("first"), consumeAllMessages(topic)) + + // shutdown follower server + servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) + + produceMessage(topic, "second") + assertEquals(List("first", "second"), consumeAllMessages(topic)) + + // shutdown leader and then restart follower + servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) + servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) + + // verify that unclean election to non-ISR follower does not occur + val newLeaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(leaderId)) + assertTrue("Leader should be defined", newLeaderIdOpt.isDefined) + assertEquals("No leader should be elected", -1, newLeaderIdOpt.get) + + // message production and consumption should both fail while leader is down + intercept[FailedToSendMessageException] { + produceMessage(topic, "third") + } + assertEquals(List.empty[String], consumeAllMessages(topic)) + + // restart leader temporarily to send a successfully replicated message + servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup()) + val newLeaderIdOpt2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(-1)) + assertTrue("Leader should be defined", newLeaderIdOpt2.isDefined) + assertEquals("Original leader should be reelected", leaderId, newLeaderIdOpt2.get) + produceMessage(topic, "third") + waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) + servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) + + // verify clean leader transition to ISR follower + val newLeaderIdOpt3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(leaderId)) + assertTrue("Leader should be defined", newLeaderIdOpt3.isDefined) + assertEquals("New leader should be elected", followerId, newLeaderIdOpt3.get) + + // verify messages can be consumed from ISR follower that was just promoted to leader + assertEquals(List("first", "second", "third"), consumeAllMessages(topic)) + } + + private def shutdownServer(server: KafkaServer) = { + server.shutdown() + server.awaitShutdown() + } + + private def produceMessage(topic: String, message: String) = { + val props = new Properties() + props.put("request.required.acks", String.valueOf(-1)) + val producer: Producer[String, Array[Byte]] = createProducer(getBrokerListStrFromConfigs(configs), + new DefaultEncoder(), new StringEncoder(), props) + producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes)) + producer.close() + } + + private def consumeAllMessages(topic: String) : List[String] = { + // use a fresh consumer group every time so that we don't need to mess with disabling auto-commit or + // resetting the ZK offset + val consumerProps = createConsumerProperties(zkConnect, "group" + random.nextLong, "id", 1000) + val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps)) + val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head + + val messages = new MutableList[String] + val iter = messageStream.iterator + try { + while(iter.hasNext()) { + messages += new String(iter.next.message) // will throw a timeout exception if the message isn't there + } + } catch { + case e: ConsumerTimeoutException => + debug("consumer timed out after receiving " + messages.length + " message(s).") + } finally { + consumerConnector.shutdown + } + messages.toList + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 89c207a..6f4809d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -93,5 +93,40 @@ class KafkaConfigTest extends JUnit3Suite { assertEquals(serverConfig.advertisedHostName, advertisedHostName) assertEquals(serverConfig.advertisedPort, advertisedPort) } - + + @Test + def testUncleanLeaderElectionDefault() { + val props = TestUtils.createBrokerConfig(0, 8181) + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.uncleanLeaderElectionEnable, true) + } + + @Test + def testUncleanElectionDisabled() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("unclean.leader.election.enable", String.valueOf(false)) + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.uncleanLeaderElectionEnable, false) + } + + @Test + def testUncleanElectionEnabled() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("unclean.leader.election.enable", String.valueOf(true)) + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.uncleanLeaderElectionEnable, true) + } + + @Test + def testUncleanElectionInvalid() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("unclean.leader.election.enable", "invalid") + + intercept[IllegalArgumentException] { + new KafkaConfig(props) + } + } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 772d214..2054c25 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -312,8 +312,8 @@ object TestUtils extends Logging { */ def createProducer[K, V](brokerList: String, encoder: Encoder[V] = new DefaultEncoder(), - keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = { - val props = new Properties() + keyEncoder: Encoder[K] = new DefaultEncoder(), + props: Properties = new Properties()): Producer[K, V] = { props.put("metadata.broker.list", brokerList) props.put("send.buffer.bytes", "65536") props.put("connect.timeout.ms", "100000")