diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 9f6956e..630768a 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -54,12 +54,12 @@ object OffsetCommitRequest extends Logging { (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp)) }) }) - OffsetCommitRequest(consumerGroupId, mutable.Map(pairs:_*), versionId, correlationId, clientId) + OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId) } } case class OffsetCommitRequest(groupId: String, - requestInfo: mutable.Map[TopicAndPartition, OffsetAndMetadata], + requestInfo: immutable.Map[TopicAndPartition, OffsetAndMetadata], versionId: Short = OffsetCommitRequest.CurrentVersion, override val correlationId: Int = 0, clientId: String = OffsetCommitRequest.DefaultClientId) diff --git a/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala b/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala index b66c8fc..a1e1279 100644 --- a/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala +++ b/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala @@ -20,8 +20,7 @@ package kafka.common /** * This exception is thrown by the leader elector in the controller when leader election fails for a partition since - * all the leader candidate replicas for a partition are offline; the set of candidates may or may not be limited - * to just the in sync replicas depending upon whether unclean leader election is allowed to occur. + * all the replicas for a partition are offline */ class NoReplicaOnlineException(message: String, cause: Throwable) extends RuntimeException(message, cause) { def this(message: String) = this(message, null) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 9a3db90..ff5e819 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -286,7 +286,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, while (!done) { val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors - val offsetsToCommit = mutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => + val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => partitionTopicInfos.filterNot { case (partition, info) => val newOffset = info.getConsumeOffset() newOffset == checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId)) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 7dc2718..5db24a7 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -21,12 +21,10 @@ import collection.Set import com.yammer.metrics.core.Gauge import java.lang.{IllegalStateException, Object} import java.util.concurrent.TimeUnit -import kafka.admin.AdminUtils import kafka.admin.PreferredReplicaLeaderElectionCommand import kafka.api._ import kafka.cluster.Broker import kafka.common._ -import kafka.log.LogConfig import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import kafka.server.{ZookeeperLeaderElector, KafkaConfig} import kafka.utils.ZkUtils._ @@ -166,7 +164,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // kafka server private val autoRebalanceScheduler = new KafkaScheduler(1) var deleteTopicManager: TopicDeletionManager = null - val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config) + val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) @@ -974,19 +972,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg if (leaderAndIsr.isr.contains(replicaId)) { // if the replica to be removed from the ISR is also the leader, set the new leader value to -1 val newLeader = if(replicaId == leaderAndIsr.leader) -1 else leaderAndIsr.leader - var newIsr = leaderAndIsr.isr.filter(b => b != replicaId) - - // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election - // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can - // eventually be restored as the leader. - if (newIsr.isEmpty && !LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(zkClient, - topicAndPartition.topic)).uncleanLeaderElectionEnable) { - info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition)) - newIsr = leaderAndIsr.isr - } - val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1, - newIsr, leaderAndIsr.zkVersion + 1) + leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1) // update the new leadership decision in zookeeper or retry val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( zkClient, diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index d3b25fa..fa29bbe 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -16,12 +16,9 @@ */ package kafka.controller -import kafka.admin.AdminUtils import kafka.api.LeaderAndIsr -import kafka.log.LogConfig import kafka.utils.Logging import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} -import kafka.server.KafkaConfig trait PartitionLeaderSelector { @@ -40,14 +37,12 @@ trait PartitionLeaderSelector { * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest): * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live * isr as the new isr. - * 2. Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException. - * 3. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr. - * 4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException + * 2. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr. + * 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException * Replicas to receive LeaderAndIsr request = live assigned replicas * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache */ -class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig) - extends PartitionLeaderSelector with Logging { +class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[OfflinePartitionLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { @@ -59,15 +54,6 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { case true => - // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration - // for unclean leader election. - if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient, - topicAndPartition.topic)).uncleanLeaderElectionEnable) { - throw new NoReplicaOnlineException(("No broker in ISR for partition " + - "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + - " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) - } - debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" .format(topicAndPartition, liveAssignedReplicas.mkString(","))) liveAssignedReplicas.isEmpty match { @@ -91,7 +77,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) (newLeaderAndIsr, liveAssignedReplicas) case None => - throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition)) + throw new NoReplicaOnlineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it") } } } diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala index 6de320d..08dcc55 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala @@ -25,10 +25,10 @@ class OffsetCommitRequest(groupId: String, correlationId: Int, clientId: String) { val underlying = { - val scalaMap: collection.mutable.Map[TopicAndPartition, OffsetAndMetadata] = { + val scalaMap: collection.immutable.Map[TopicAndPartition, OffsetAndMetadata] = { import collection.JavaConversions._ - collection.JavaConversions.asMap(requestInfo) + requestInfo.toMap } kafka.api.OffsetCommitRequest( groupId = groupId, diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 5746ad4..18c86fe 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -21,23 +21,6 @@ import java.util.Properties import scala.collection._ import kafka.common._ -object Defaults { - val SegmentSize = 1024 * 1024 - val SegmentMs = Long.MaxValue - val FlushInterval = Long.MaxValue - val FlushMs = Long.MaxValue - val RetentionSize = Long.MaxValue - val RetentionMs = Long.MaxValue - val MaxMessageSize = Int.MaxValue - val MaxIndexSize = 1024 * 1024 - val IndexInterval = 4096 - val FileDeleteDelayMs = 60 * 1000L - val DeleteRetentionMs = 24 * 60 * 60 * 1000L - val MinCleanableDirtyRatio = 0.5 - val Compact = false - val UncleanLeaderElectionEnable = true -} - /** * Configuration settings for a log * @param segmentSize The soft maximum for the size of a segment file in the log @@ -52,23 +35,20 @@ object Defaults { * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted. * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned * @param compact Should old segments in this log be deleted or deduplicated? - * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled; actually a controller-level property - * but included here for topic-specific configuration validation purposes */ -case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, - val segmentMs: Long = Defaults.SegmentMs, - val flushInterval: Long = Defaults.FlushInterval, - val flushMs: Long = Defaults.FlushMs, - val retentionSize: Long = Defaults.RetentionSize, - val retentionMs: Long = Defaults.RetentionMs, - val maxMessageSize: Int = Defaults.MaxMessageSize, - val maxIndexSize: Int = Defaults.MaxIndexSize, - val indexInterval: Int = Defaults.IndexInterval, - val fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs, - val deleteRetentionMs: Long = Defaults.DeleteRetentionMs, - val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, - val compact: Boolean = Defaults.Compact, - val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable) { +case class LogConfig(val segmentSize: Int = 1024*1024, + val segmentMs: Long = Long.MaxValue, + val flushInterval: Long = Long.MaxValue, + val flushMs: Long = Long.MaxValue, + val retentionSize: Long = Long.MaxValue, + val retentionMs: Long = Long.MaxValue, + val maxMessageSize: Int = Int.MaxValue, + val maxIndexSize: Int = 1024*1024, + val indexInterval: Int = 4096, + val fileDeleteDelayMs: Long = 60*1000, + val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L, + val minCleanableRatio: Double = 0.5, + val compact: Boolean = false) { def toProps: Properties = { val props = new Properties() @@ -86,7 +66,6 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString) props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) props.put(CleanupPolicyProp, if(compact) "compact" else "delete") - props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) props } @@ -106,7 +85,6 @@ object LogConfig { val FileDeleteDelayMsProp = "file.delete.delay.ms" val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" val CleanupPolicyProp = "cleanup.policy" - val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" val ConfigNames = Set(SegmentBytesProp, SegmentMsProp, @@ -120,31 +98,26 @@ object LogConfig { FileDeleteDelayMsProp, DeleteRetentionMsProp, MinCleanableDirtyRatioProp, - CleanupPolicyProp, - UncleanLeaderElectionEnableProp) + CleanupPolicyProp) /** * Parse the given properties instance into a LogConfig object */ def fromProps(props: Properties): LogConfig = { - new LogConfig(segmentSize = props.getProperty(SegmentBytesProp, Defaults.SegmentSize.toString).toInt, - segmentMs = props.getProperty(SegmentMsProp, Defaults.SegmentMs.toString).toLong, - maxIndexSize = props.getProperty(SegmentIndexBytesProp, Defaults.MaxIndexSize.toString).toInt, - flushInterval = props.getProperty(FlushMessagesProp, Defaults.FlushInterval.toString).toLong, - flushMs = props.getProperty(FlushMsProp, Defaults.FlushMs.toString).toLong, - retentionSize = props.getProperty(RetentionBytesProp, Defaults.RetentionSize.toString).toLong, - retentionMs = props.getProperty(RententionMsProp, Defaults.RetentionMs.toString).toLong, - maxMessageSize = props.getProperty(MaxMessageBytesProp, Defaults.MaxMessageSize.toString).toInt, - indexInterval = props.getProperty(IndexIntervalBytesProp, Defaults.IndexInterval.toString).toInt, - fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString).toInt, - deleteRetentionMs = props.getProperty(DeleteRetentionMsProp, Defaults.DeleteRetentionMs.toString).toLong, - minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp, - Defaults.MinCleanableDirtyRatio.toString).toDouble, - compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact" else "delete") - .trim.toLowerCase != "delete", - uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp, - Defaults.UncleanLeaderElectionEnable.toString).toBoolean) + new LogConfig(segmentSize = props.getProperty(SegmentBytesProp).toInt, + segmentMs = props.getProperty(SegmentMsProp).toLong, + maxIndexSize = props.getProperty(SegmentIndexBytesProp).toInt, + flushInterval = props.getProperty(FlushMessagesProp).toLong, + flushMs = props.getProperty(FlushMsProp).toLong, + retentionSize = props.getProperty(RetentionBytesProp).toLong, + retentionMs = props.getProperty(RententionMsProp).toLong, + maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt, + indexInterval = props.getProperty(IndexIntervalBytesProp).toInt, + fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt, + deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong, + minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble, + compact = props.getProperty(CleanupPolicyProp).trim.toLowerCase != "delete") } /** diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 08de0ef..d07796e 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -231,9 +231,6 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency with which the partition rebalance check is triggered by the controller */ val leaderImbalanceCheckIntervalSeconds = props.getInt("leader.imbalance.check.interval.seconds", 300) - /* indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though - * doing so may result in data loss */ - val uncleanLeaderElectionEnable = props.getBoolean("unclean.leader.election.enable", true) /*********** Controlled shutdown configuration ***********/ diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 75ae1e1..73e605e 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -17,9 +17,7 @@ package kafka.server -import kafka.admin.AdminUtils import kafka.cluster.Broker -import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet import kafka.api.{OffsetRequest, FetchResponsePartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} @@ -83,21 +81,9 @@ class ReplicaFetcherThread(name:String, */ val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) if (leaderEndOffset < replica.logEndOffset) { - // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. - // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, - // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. - if (!LogConfig.fromProps(brokerConfig.props.props, AdminUtils.fetchTopicConfig(replicaMgr.zkClient, - topicAndPartition.topic)).uncleanLeaderElectionEnable) { - // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. - fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + - " Current leader %d's latest offset %d is less than replica %d's latest offset %d" - .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset)) - Runtime.getRuntime.halt(1) - } - replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset)) - warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderEndOffset)) + warn("Replica %d for partition %s reset its fetch offset to current leader %d's latest offset %d" + .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderEndOffset)) leaderEndOffset } else { /** @@ -108,8 +94,8 @@ class ReplicaFetcherThread(name:String, */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) - warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderStartOffset)) + warn("Replica %d for partition %s reset its fetch offset to current leader %d's start offset %d" + .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderStartOffset)) leaderStartOffset } } diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index 83317f0..c468419 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -66,7 +66,7 @@ object TestOffsetManager { } override def doWork() { - val commitRequest = OffsetCommitRequest(group, mutable.Map((1 to partitionCount).map(TopicAndPartition("topic-" + id, _) -> OffsetAndMetadata(offset, metadata)):_*)) + val commitRequest = OffsetCommitRequest(group, immutable.Map((1 to partitionCount).map(TopicAndPartition("topic-" + id, _) -> OffsetAndMetadata(offset, metadata)):_*)) try { ensureConnected() offsetsChannel.send(commitRequest) diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 5378446..d39a9a4 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -147,7 +147,7 @@ object SerializationTestUtils { } def createTestOffsetCommitRequest: OffsetCommitRequest = { - new OffsetCommitRequest("group 1", collection.mutable.Map( + new OffsetCommitRequest("group 1", collection.immutable.Map( TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds), TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds) )) diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index e86ee80..b585f0e 100644 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.integration +package kafka.server import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness @@ -27,7 +27,6 @@ import kafka.cluster.Broker import kafka.common.ErrorMapping import kafka.api._ import kafka.admin.AdminUtils -import kafka.server.{KafkaConfig, KafkaServer} class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { val brokerId1 = 0 diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala deleted file mode 100644 index c5f2da9..0000000 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ /dev/null @@ -1,290 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.integration - -import scala.collection.mutable.MutableList -import scala.util.Random -import org.apache.log4j.{Level, Logger} -import org.scalatest.junit.JUnit3Suite -import java.util.Properties -import junit.framework.Assert._ -import kafka.admin.AdminUtils -import kafka.common.FailedToSendMessageException -import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException} -import kafka.producer.{KeyedMessage, Producer} -import kafka.serializer.{DefaultEncoder, StringEncoder} -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.Utils -import kafka.utils.TestUtils._ -import kafka.zk.ZooKeeperTestHarness - -class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { - val brokerId1 = 0 - val brokerId2 = 1 - - val port1 = choosePort() - val port2 = choosePort() - - // controlled shutdown is needed for these tests, but we can trim the retry count and backoff interval to - // reduce test execution time - val enableControlledShutdown = true - val configProps1 = createBrokerConfig(brokerId1, port1) - val configProps2 = createBrokerConfig(brokerId2, port2) - - for (configProps <- List(configProps1, configProps2)) { - configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown)) - configProps.put("controlled.shutdown.max.retries", String.valueOf(1)) - configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000)) - } - - var configs: Seq[KafkaConfig] = Seq.empty[KafkaConfig] - var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] - - val random = new Random() - val topic = "topic" + random.nextLong - val partitionId = 0 - - val kafkaApisLogger = Logger.getLogger(classOf[kafka.server.KafkaApis]) - val networkProcessorLogger = Logger.getLogger(classOf[kafka.network.Processor]) - val syncProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer]) - val eventHandlerLogger = Logger.getLogger(classOf[kafka.producer.async.DefaultEventHandler[Object, Object]]) - - override def setUp() { - super.setUp() - - // temporarily set loggers to a higher level so that tests run quietly - kafkaApisLogger.setLevel(Level.FATAL) - networkProcessorLogger.setLevel(Level.FATAL) - syncProducerLogger.setLevel(Level.FATAL) - eventHandlerLogger.setLevel(Level.FATAL) - } - - override def tearDown() { - servers.map(server => shutdownServer(server)) - servers.map(server => Utils.rm(server.config.logDirs)) - - // restore log levels - kafkaApisLogger.setLevel(Level.ERROR) - networkProcessorLogger.setLevel(Level.ERROR) - syncProducerLogger.setLevel(Level.ERROR) - eventHandlerLogger.setLevel(Level.ERROR) - - super.tearDown() - } - - private def startBrokers(cluster: Seq[Properties]) { - for (props <- cluster) { - val config = new KafkaConfig(props) - val server = createServer(config) - configs ++= List(config) - servers ++= List(server) - } - } - - def testUncleanLeaderElectionEnabled { - // unclean leader election is enabled by default - startBrokers(Seq(configProps1, configProps2)) - - // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) - - verifyUncleanLeaderElectionEnabled - } - - def testUncleanLeaderElectionDisabled { - // disable unclean leader election - configProps1.put("unclean.leader.election.enable", String.valueOf(false)) - configProps2.put("unclean.leader.election.enable", String.valueOf(false)) - startBrokers(Seq(configProps1, configProps2)) - - // create topic with 1 partition, 2 replicas, one on each broker - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2))) - - verifyUncleanLeaderElectionDisabled - } - - def testUncleanLeaderElectionEnabledByTopicOverride { - // disable unclean leader election globally, but enable for our specific test topic - configProps1.put("unclean.leader.election.enable", String.valueOf(false)) - configProps2.put("unclean.leader.election.enable", String.valueOf(false)) - startBrokers(Seq(configProps1, configProps2)) - - // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled - val topicProps = new Properties() - topicProps.put("unclean.leader.election.enable", String.valueOf(true)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), - topicProps) - - verifyUncleanLeaderElectionEnabled - } - - def testCleanLeaderElectionDisabledByTopicOverride { - // enable unclean leader election globally, but disable for our specific test topic - configProps1.put("unclean.leader.election.enable", String.valueOf(true)) - configProps2.put("unclean.leader.election.enable", String.valueOf(true)) - startBrokers(Seq(configProps1, configProps2)) - - // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled - val topicProps = new Properties() - topicProps.put("unclean.leader.election.enable", String.valueOf(false)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)), - topicProps) - - verifyUncleanLeaderElectionDisabled - } - - def testUncleanLeaderElectionInvalidTopicOverride { - startBrokers(Seq(configProps1)) - - // create topic with an invalid value for unclean leader election - val topicProps = new Properties() - topicProps.put("unclean.leader.election.enable", "invalid") - - intercept[IllegalArgumentException] { - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1)), topicProps) - } - } - - def verifyUncleanLeaderElectionEnabled { - // wait until leader is elected - val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000) - assertTrue("Leader should get elected", leaderIdOpt.isDefined) - val leaderId = leaderIdOpt.get - debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) - assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) - - // the non-leader broker is the follower - val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 - debug("Follower for " + topic + " is: %s".format(followerId)) - - produceMessage(topic, "first") - waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) - assertEquals(List("first"), consumeAllMessages(topic)) - - // shutdown follower server - servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - - produceMessage(topic, "second") - assertEquals(List("first", "second"), consumeAllMessages(topic)) - - // shutdown leader and then restart follower - servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) - - // wait until new leader is (uncleanly) elected - val newLeaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(leaderId)) - assertTrue("New leader should get elected", newLeaderIdOpt.isDefined) - assertEquals(followerId, newLeaderIdOpt.get) - - produceMessage(topic, "third") - - // second message was lost due to unclean election - assertEquals(List("first", "third"), consumeAllMessages(topic)) - } - - def verifyUncleanLeaderElectionDisabled { - // wait until leader is elected - val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000) - assertTrue("Leader should get elected", leaderIdOpt.isDefined) - val leaderId = leaderIdOpt.get - debug("Leader for " + topic + " is elected to be: %s".format(leaderId)) - assertTrue("Leader id is set to expected value for topic: " + topic, leaderId == brokerId1 || leaderId == brokerId2) - - // the non-leader broker is the follower - val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 - debug("Follower for " + topic + " is: %s".format(followerId)) - - produceMessage(topic, "first") - waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) - assertEquals(List("first"), consumeAllMessages(topic)) - - // shutdown follower server - servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - - produceMessage(topic, "second") - assertEquals(List("first", "second"), consumeAllMessages(topic)) - - // shutdown leader and then restart follower - servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - servers.filter(server => server.config.brokerId == followerId).map(server => server.startup()) - - // verify that unclean election to non-ISR follower does not occur - val newLeaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(leaderId)) - assertTrue("Leader should be defined", newLeaderIdOpt.isDefined) - assertEquals("No leader should be elected", -1, newLeaderIdOpt.get) - - // message production and consumption should both fail while leader is down - intercept[FailedToSendMessageException] { - produceMessage(topic, "third") - } - assertEquals(List.empty[String], consumeAllMessages(topic)) - - // restart leader temporarily to send a successfully replicated message - servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup()) - val newLeaderIdOpt2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(-1)) - assertTrue("Leader should be defined", newLeaderIdOpt2.isDefined) - assertEquals("Original leader should be reelected", leaderId, newLeaderIdOpt2.get) - produceMessage(topic, "third") - waitUntilMetadataIsPropagated(servers, topic, partitionId, 1000) - servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) - - // verify clean leader transition to ISR follower - val newLeaderIdOpt3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1000, Some(leaderId)) - assertTrue("Leader should be defined", newLeaderIdOpt3.isDefined) - assertEquals("New leader should be elected", followerId, newLeaderIdOpt3.get) - - // verify messages can be consumed from ISR follower that was just promoted to leader - assertEquals(List("first", "second", "third"), consumeAllMessages(topic)) - } - - private def shutdownServer(server: KafkaServer) = { - server.shutdown() - server.awaitShutdown() - } - - private def produceMessage(topic: String, message: String) = { - val props = new Properties() - props.put("request.required.acks", String.valueOf(-1)) - val producer: Producer[String, Array[Byte]] = createProducer(getBrokerListStrFromConfigs(configs), - new DefaultEncoder(), new StringEncoder(), props) - producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes)) - producer.close() - } - - private def consumeAllMessages(topic: String) : List[String] = { - // use a fresh consumer group every time so that we don't need to mess with disabling auto-commit or - // resetting the ZK offset - val consumerProps = createConsumerProperties(zkConnect, "group" + random.nextLong, "id", 1000) - val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps)) - val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head - - val messages = new MutableList[String] - val iter = messageStream.iterator - try { - while(iter.hasNext()) { - messages += new String(iter.next.message) // will throw a timeout exception if the message isn't there - } - } catch { - case e: ConsumerTimeoutException => - debug("consumer timed out after receiving " + messages.length + " message(s).") - } finally { - consumerConnector.shutdown - } - messages.toList - } -} diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 6f4809d..89c207a 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -93,40 +93,5 @@ class KafkaConfigTest extends JUnit3Suite { assertEquals(serverConfig.advertisedHostName, advertisedHostName) assertEquals(serverConfig.advertisedPort, advertisedPort) } - - @Test - def testUncleanLeaderElectionDefault() { - val props = TestUtils.createBrokerConfig(0, 8181) - val serverConfig = new KafkaConfig(props) - - assertEquals(serverConfig.uncleanLeaderElectionEnable, true) - } - - @Test - def testUncleanElectionDisabled() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("unclean.leader.election.enable", String.valueOf(false)) - val serverConfig = new KafkaConfig(props) - - assertEquals(serverConfig.uncleanLeaderElectionEnable, false) - } - - @Test - def testUncleanElectionEnabled() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("unclean.leader.election.enable", String.valueOf(true)) - val serverConfig = new KafkaConfig(props) - - assertEquals(serverConfig.uncleanLeaderElectionEnable, true) - } - - @Test - def testUncleanElectionInvalid() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("unclean.leader.election.enable", "invalid") - - intercept[IllegalArgumentException] { - new KafkaConfig(props) - } - } + } diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index e632997..ae9bb3a 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -81,7 +81,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined) - val commitRequest = OffsetCommitRequest("test-group", mutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) + val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) val commitResponse = simpleConsumer.commitOffsets(commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) @@ -95,7 +95,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(42L, fetchResponse.requestInfo.get(topicAndPartition).get.offset) // Commit a new offset - val commitRequest1 = OffsetCommitRequest(group, mutable.Map(topicAndPartition -> OffsetAndMetadata( + val commitRequest1 = OffsetCommitRequest(group, immutable.Map(topicAndPartition -> OffsetAndMetadata( offset=100L, metadata="some metadata" ))) @@ -120,7 +120,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val topic3 = "topic-3" val topic4 = "topic-4" - val commitRequest = OffsetCommitRequest("test-group", mutable.Map( + val commitRequest = OffsetCommitRequest("test-group", immutable.Map( TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="metadata one"), TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=43L, metadata="metadata two"), TopicAndPartition(topic3, 0) -> OffsetAndMetadata(offset=44L, metadata="metadata three"), @@ -172,7 +172,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { var leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicAndPartition.topic, 0, 1000) assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined) - val commitRequest = OffsetCommitRequest("test-group", mutable.Map(topicAndPartition -> OffsetAndMetadata( + val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata( offset=42L, metadata=random.nextString(server.config.offsetMetadataMaxSize) ))) @@ -180,7 +180,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) - val commitRequest1 = OffsetCommitRequest(group, mutable.Map(topicAndPartition -> OffsetAndMetadata( + val commitRequest1 = OffsetCommitRequest(group, immutable.Map(topicAndPartition -> OffsetAndMetadata( offset=42L, metadata=random.nextString(server.config.offsetMetadataMaxSize + 1) ))) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 2054c25..772d214 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -312,8 +312,8 @@ object TestUtils extends Logging { */ def createProducer[K, V](brokerList: String, encoder: Encoder[V] = new DefaultEncoder(), - keyEncoder: Encoder[K] = new DefaultEncoder(), - props: Properties = new Properties()): Producer[K, V] = { + keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = { + val props = new Properties() props.put("metadata.broker.list", brokerList) props.put("send.buffer.bytes", "65536") props.put("connect.timeout.ms", "100000")