From 3b8023b4b3f14eef3f46338c9dfadef228d2f186 Mon Sep 17 00:00:00 2001 From: mgharat Date: Wed, 27 Aug 2014 19:28:10 -0700 Subject: [PATCH 1/4] Converted mapValues to map wherever necessary so that local modifications to collections will not be lost. mapValues is used only if the variables are used locally : KAFKA-1610 --- .../kafka/admin/ReassignPartitionsCommand.scala | 3 +- .../scala/kafka/controller/KafkaController.scala | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 2 +- .../src/main/scala/kafka/server/DelayedFetch.scala | 2 +- .../main/scala/kafka/server/DelayedProduce.scala | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 8 +- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- .../kafka/tools/ReplicaVerificationTool.scala | 156 +++++++++++---------- .../unit/kafka/server/LeaderElectionTest.scala | 3 +- 9 files changed, 91 insertions(+), 89 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 691d69a..25ecc9e 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -17,6 +17,7 @@ package kafka.admin import joptsimple.OptionParser +import kafka.controller.ReassignedPartitionsContext import kafka.utils._ import collection._ import org.I0Itec.zkclient.ZkClient @@ -120,7 +121,7 @@ object ReassignPartitionsCommand extends Logging { private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) :Map[TopicAndPartition, ReassignmentStatus] = { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).map(p => p._1 -> p._2.newReplicas) partitionsToBeReassigned.map { topicAndPartition => (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 8ab4a1b..3670530 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -929,7 +929,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // remove this partition from that list val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition // write the new list to zookeeper - ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) + ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.map(p => p._1 -> p._2.newReplicas)) // update the cache. NO-OP if the partition's reassignment was never started controllerContext.partitionsBeingReassigned.remove(topicAndPartition) } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4d2924d..990a5ca 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -314,7 +314,7 @@ class LogManager(val logDirs: Array[File], private def checkpointLogsInDir(dir: File): Unit = { val recoveryPoints = this.logsByDir.get(dir.toString) if (recoveryPoints.isDefined) { - this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) + this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.map(p => p._1 -> p._2.recoveryPoint)) } } diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index e0f14e2..f29522a 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -86,6 +86,6 @@ class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], def respond(replicaManager: ReplicaManager): FetchResponse = { val topicData = replicaManager.readMessageSets(fetch) - FetchResponse(fetch.correlationId, topicData.mapValues(_.data)) + FetchResponse(fetch.correlationId, topicData.map(p => p._1 -> p._2.data)) } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 9481508..84a63d1 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -57,7 +57,7 @@ class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], } def respond(offsetManager: OffsetManager): RequestOrResponse = { - val responseStatus = partitionStatus.mapValues(status => status.responseStatus) + val responseStatus = partitionStatus.map(topicPartitionAndResponseStatus => topicPartitionAndResponseStatus._1 -> topicPartitionAndResponseStatus._2.responseStatus) val errorCode = responseStatus.find { case (_, status) => status.error != ErrorMapping.NoError diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c584b55..ccf1da3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -304,7 +304,7 @@ class KafkaApis(val requestChannel: RequestChannel, // if the fetch request comes from the follower, // update its corresponding log end offset if(fetchRequest.isFromFollower) - recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset)) + recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.map(p => p._1 -> p._2.offset)) // check if this fetch request can be satisfied right away val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum @@ -320,16 +320,14 @@ class KafkaApis(val requestChannel: RequestChannel, errorReadingData) { debug("Returning fetch response %s for fetch request with correlation id %d to client %s" .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) - val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data)) + val response = new FetchResponse(fetchRequest.correlationId, dataRead.map(tup => tup._1->tup._2.data)) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, fetchRequest.clientId)) // create a list of (topic, partition) pairs to use as keys for this delayed request val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, - dataRead.mapValues(_.offset)) - + val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, dataRead.map(tup => tup._1 -> tup._2.offset)) // add the fetch request for watch if it's not satisfied, otherwise send the response back val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) if (satisfiedByMe) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 2871118..3eb7962 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -289,7 +289,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg minCleanableRatio = config.logCleanerMinCleanRatio, compact = config.logCleanupPolicy.trim.toLowerCase == "compact") val defaultProps = defaultLogConfig.toProps - val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) + val configs = AdminUtils.fetchAllTopicConfigs(zkClient).map(topicConfigs => topicConfigs._1 -> LogConfig.fromProps(defaultProps, topicConfigs._2)) // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, dedupeBufferSize = config.logCleanerDedupeBufferSize, diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index af47836..71e82f6 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -32,20 +32,20 @@ import kafka.utils._ import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer} /** - * For verifying the consistency among replicas. + * For verifying the consistency among replicas. * - * 1. start a fetcher on every broker. - * 2. each fetcher does the following - * 2.1 issues fetch request - * 2.2 puts the fetched result in a shared buffer - * 2.3 waits for all other fetchers to finish step 2.2 - * 2.4 one of the fetchers verifies the consistency of fetched results among replicas + * 1. start a fetcher on every broker. + * 2. each fetcher does the following + * 2.1 issues fetch request + * 2.2 puts the fetched result in a shared buffer + * 2.3 waits for all other fetchers to finish step 2.2 + * 2.4 one of the fetchers verifies the consistency of fetched results among replicas * - * The consistency verification is up to the high watermark. The tool reports the - * max lag between the verified offset and the high watermark among all partitions. + * The consistency verification is up to the high watermark. The tool reports the + * max lag between the verified offset and the high watermark among all partitions. * - * If a broker goes down, the verification of the partitions on that broker is delayed - * until the broker is up again. + * If a broker goes down, the verification of the partitions on that broker is delayed + * until the broker is up again. * * Caveats: * 1. The tools needs all brokers to be up at startup time. @@ -53,7 +53,7 @@ import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer} */ object ReplicaVerificationTool extends Logging { - val clientId= "replicaVerificationTool" + val clientId = "replicaVerificationTool" val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS" val dateFormat = new SimpleDateFormat(dateFormatString) @@ -64,39 +64,39 @@ object ReplicaVerificationTool extends Logging { def main(args: Array[String]): Unit = { val parser = new OptionParser val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") - .withRequiredArg - .describedAs("hostname:port,...,hostname:port") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("hostname:port,...,hostname:port") + .ofType(classOf[String]) val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.") - .withRequiredArg - .describedAs("bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.FetchSize) + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(ConsumerConfig.FetchSize) val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1000) val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.") - .withRequiredArg - .describedAs("Java regex (String)") - .ofType(classOf[String]) - .defaultsTo(".*") + .withRequiredArg + .describedAs("Java regex (String)") + .ofType(classOf[String]) + .defaultsTo(".*") val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.") - .withRequiredArg - .describedAs("timestamp/-1(latest)/-2(earliest)") - .ofType(classOf[java.lang.Long]) - .defaultsTo(-1L) + .withRequiredArg + .describedAs("timestamp/-1(latest)/-2(earliest)") + .ofType(classOf[java.lang.Long]) + .defaultsTo(-1L) val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Long]) - .defaultsTo(30 * 1000L) - - if(args.length == 0) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(30 * 1000L) + + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") - val options = parser.parse(args : _*) + val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) val regex = options.valueOf(topicWhiteListOpt) @@ -120,10 +120,10 @@ object ReplicaVerificationTool extends Logging { val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( - topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) - true - else - false + topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) + true + else + false ) val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap( topicMetadataResponse => @@ -136,42 +136,43 @@ object ReplicaVerificationTool extends Logging { debug("Selected topic partitions: " + topicPartitionReplicaList) val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId) .map { case (brokerId, partitions) => - brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } } + brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId)} + } debug("Topic partitions per broker: " + topicAndPartitionsPerBroker) val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] = - topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) - .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } + topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) + .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size} debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) + val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.map( partitionMetadata => (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)) ).groupBy(_._2) - .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { - case(topicAndPartition, leaderId) => topicAndPartition }) + .map {case (leaderId,topicAndPartitionAndLeaderIds) => leaderId -> topicAndPartitionAndLeaderIds.map{case(topicAndPartition, leaderId) => topicAndPartition}} debug("Leaders per broker: " + leadersPerBroker) val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, - leadersPerBroker, - topicAndPartitionsPerBroker.size, - brokerMap, - initialOffsetTime, - reportInterval) + leadersPerBroker, + topicAndPartitionsPerBroker.size, + brokerMap, + initialOffsetTime, + reportInterval) // create all replica fetcher threads val verificationBrokerId = topicAndPartitionsPerBroker.head._1 val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map { case (brokerId, topicAndPartitions) => new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId, - sourceBroker = brokerMap(brokerId), - topicAndPartitions = topicAndPartitions, - replicaBuffer = replicaBuffer, - socketTimeout = 30000, - socketBufferSize = 256000, - fetchSize = fetchSize, - maxWait = maxWaitMs, - minBytes = 1, - doVerification = (brokerId == verificationBrokerId)) + sourceBroker = brokerMap(brokerId), + topicAndPartitions = topicAndPartitions, + replicaBuffer = replicaBuffer, + socketTimeout = 30000, + socketBufferSize = 256000, + fetchSize = fetchSize, + maxWait = maxWaitMs, + minBytes = 1, + doVerification = (brokerId == verificationBrokerId)) } Runtime.getRuntime.addShutdownHook(new Thread() { @@ -186,7 +187,7 @@ object ReplicaVerificationTool extends Logging { } } -private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) +private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) private case class ReplicaAndMessageIterator(replicaId: Int, iterator: Iterator[MessageAndOffset]) @@ -241,7 +242,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa val offsetRequest = OffsetRequest(initialOffsetMap) val offsetResponse = consumer.getOffsetsBefore(offsetRequest) assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse)) - offsetResponse.partitionErrorAndOffsets.foreach{ + offsetResponse.partitionErrorAndOffsets.foreach { case (topicAndPartition, partitionOffsetResponse) => fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head) } @@ -262,18 +263,19 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) { debug("Verifying " + topicAndPartition) assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition), - "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " - + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") + "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " + + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") val messageIteratorMap = fetchResponsePerReplica.map { - case(replicaId, fetchResponse) => - replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator} + case (replicaId, fetchResponse) => + replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator + } val maxHw = fetchResponsePerReplica.values.map(_.hw).max // Iterate one message at a time from every replica, until high watermark is reached. var isMessageInAllReplicas = true while (isMessageInAllReplicas) { var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None - for ( (replicaId, messageIterator) <- messageIteratorMap) { + for ((replicaId, messageIterator) <- messageIteratorMap) { try { if (messageIterator.hasNext) { val messageAndOffset = messageIterator.next() @@ -285,7 +287,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa messageInfoFromFirstReplicaOpt match { case None => messageInfoFromFirstReplicaOpt = Some( - MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum)) + MessageInfo(replicaId, messageAndOffset.offset, messageAndOffset.nextOffset, messageAndOffset.message.checksum)) case Some(messageInfoFromFirstReplica) => if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) { println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition @@ -306,14 +308,14 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } catch { case t: Throwable => throw new RuntimeException("Error in processing replica %d in partition %s at offset %d." - .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) + .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) } } if (isMessageInAllReplicas) { val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset fetchOffsetMap.put(topicAndPartition, nextOffset) debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " + - nextOffset + " for " + topicAndPartition) + nextOffset + " for " + topicAndPartition) } } if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) { @@ -339,10 +341,10 @@ private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartiti extends ShutdownableThread(name) { val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId) val fetchRequestBuilder = new FetchRequestBuilder(). - clientId(ReplicaVerificationTool.clientId). - replicaId(Request.DebuggingConsumerId). - maxWait(maxWait). - minBytes(minBytes) + clientId(ReplicaVerificationTool.clientId). + replicaId(Request.DebuggingConsumerId). + maxWait(maxWait). + minBytes(minBytes) override def doWork() { @@ -367,7 +369,7 @@ private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartiti if (response != null) { response.data.foreach { - case(topicAndPartition, partitionData) => + case (topicAndPartition, partitionData) => replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData) } } else { diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index c2ba07c..5e50bb2 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -127,7 +127,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderIsrAndControllerEpoch] leaderAndIsr.put((topic, partitionId), new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2)) - val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, Set(0,1))).toMap + val partitionStateInfo = leaderAndIsr.map{case(topicAndPartitionId, l) => topicAndPartitionId -> new PartitionStateInfo(l, Set(0,1))}.toMap + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, staleControllerEpoch, 0, "") -- 1.9.3 (Apple Git-50) From 40ece49363d9f8b848e8e0d80d0b51c8a75ebb1b Mon Sep 17 00:00:00 2001 From: mgharat Date: Fri, 29 Aug 2014 09:40:36 -0700 Subject: [PATCH 2/4] Added comments explaining the changes and reverted back some changes as per comments on the reviewboard --- .../kafka/admin/ReassignPartitionsCommand.scala | 5 +- .../scala/kafka/controller/KafkaController.scala | 3 +- core/src/main/scala/kafka/log/LogManager.scala | 2 +- .../src/main/scala/kafka/server/DelayedFetch.scala | 6 +- .../main/scala/kafka/server/DelayedProduce.scala | 4 +- core/src/main/scala/kafka/server/KafkaApis.scala | 9 ++- core/src/main/scala/kafka/server/KafkaServer.scala | 1 + .../kafka/tools/ReplicaVerificationTool.scala | 77 +++++++++++----------- .../unit/kafka/server/LeaderElectionTest.scala | 2 +- 9 files changed, 61 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 25ecc9e..8eeb831 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -17,7 +17,6 @@ package kafka.admin import joptsimple.OptionParser -import kafka.controller.ReassignedPartitionsContext import kafka.utils._ import collection._ import org.I0Itec.zkclient.ZkClient @@ -121,7 +120,9 @@ object ReassignPartitionsCommand extends Logging { private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) :Map[TopicAndPartition, ReassignmentStatus] = { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).map(p => p._1 -> p._2.newReplicas) + //Changing mapValues to map since this function returns a mapping of TopicAnPartition to ReassignmentStatus + //and ReassignmentStatus depends on partitionsBeingReassigned + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).map{ case (topicAndPartition, reassignedPartitionsContext) => topicAndPartition -> reassignedPartitionsContext.newReplicas } partitionsToBeReassigned.map { topicAndPartition => (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 3670530..5811c34 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -929,7 +929,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // remove this partition from that list val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition // write the new list to zookeeper - ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.map(p => p._1 -> p._2.newReplicas)) + //Changing the mapValues to map since any change in TopicAndPartition or newReplicas will make the writes inconsistent + ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.map{ case (topicAndPartition, reassignedPartitionsContext) => topicAndPartition -> reassignedPartitionsContext.newReplicas }) // update the cache. NO-OP if the partition's reassignment was never started controllerContext.partitionsBeingReassigned.remove(topicAndPartition) } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 990a5ca..4d2924d 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -314,7 +314,7 @@ class LogManager(val logDirs: Array[File], private def checkpointLogsInDir(dir: File): Unit = { val recoveryPoints = this.logsByDir.get(dir.toString) if (recoveryPoints.isDefined) { - this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.map(p => p._1 -> p._2.recoveryPoint)) + this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) } } diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index f29522a..898dd30 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -17,6 +17,7 @@ package kafka.server +import kafka.common import kafka.network.RequestChannel import kafka.api.{FetchResponse, FetchRequest} import kafka.common.{UnknownTopicOrPartitionException, NotLeaderForPartitionException, TopicAndPartition} @@ -63,7 +64,7 @@ class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], debug("Satisfying fetch request %s since it is fetching later segments of partition %s.".format(fetch, topicAndPartition)) return true } else if (fetchOffset.offsetOnOlderSegment(endOffset)) { - // Case C, this can happen when the folloer replica is lagging too much + // Case C, this can happen when the follower replica is lagging too much debug("Satisfying fetch request %s immediately since it is fetching older segments.".format(fetch)) return true } else if (fetchOffset.precedes(endOffset)) { @@ -86,6 +87,7 @@ class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], def respond(replicaManager: ReplicaManager): FetchResponse = { val topicData = replicaManager.readMessageSets(fetch) - FetchResponse(fetch.correlationId, topicData.map(p => p._1 -> p._2.data)) + //Changing the mapValues to map to avoid any changes to a constructed response due to changes in the topic data + FetchResponse(fetch.correlationId, topicData.map{ case (topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.data }) } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 84a63d1..79138eb 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -57,7 +57,9 @@ class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], } def respond(offsetManager: OffsetManager): RequestOrResponse = { - val responseStatus = partitionStatus.map(topicPartitionAndResponseStatus => topicPartitionAndResponseStatus._1 -> topicPartitionAndResponseStatus._2.responseStatus) + //Changing mapValues to map since the responseStatus is used in construction of the ProducerResponse + val responseStatus = partitionStatus + .map{ case (topicAndPartition, delayedProduceResponseStatus) => topicAndPartition -> delayedProduceResponseStatus.responseStatus} val errorCode = responseStatus.find { case (_, status) => status.error != ErrorMapping.NoError diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ccf1da3..1b2756e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -303,8 +303,9 @@ class KafkaApis(val requestChannel: RequestChannel, // if the fetch request comes from the follower, // update its corresponding log end offset + //Changing mapValues to map to avoid any inconsistency when the log end offset is been updated if(fetchRequest.isFromFollower) - recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.map(p => p._1 -> p._2.offset)) + recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.map{ case(topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.offset }) // check if this fetch request can be satisfied right away val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum @@ -320,14 +321,16 @@ class KafkaApis(val requestChannel: RequestChannel, errorReadingData) { debug("Returning fetch response %s for fetch request with correlation id %d to client %s" .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) - val response = new FetchResponse(fetchRequest.correlationId, dataRead.map(tup => tup._1->tup._2.data)) + //Changing mapValues to map since the (TopicAndPartition -> ResponsePartitionData) is used in FetchResponse for processing the response + val response = new FetchResponse(fetchRequest.correlationId, dataRead.map{ case(topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.data }) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, fetchRequest.clientId)) // create a list of (topic, partition) pairs to use as keys for this delayed request val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, dataRead.map(tup => tup._1 -> tup._2.offset)) + //Changing mapValues to map since the partitionOffsets are passed and used in DelayedFetch + val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, dataRead.map{ case(topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.offset }) // add the fetch request for watch if it's not satisfied, otherwise send the response back val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) if (satisfiedByMe) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 3eb7962..7359e55 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -289,6 +289,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg minCleanableRatio = config.logCleanerMinCleanRatio, compact = config.logCleanupPolicy.trim.toLowerCase == "compact") val defaultProps = defaultLogConfig.toProps + //Changing from mapValues to map since configs is passed to LogManager and any changes in the properties may affect the LogManager val configs = AdminUtils.fetchAllTopicConfigs(zkClient).map(topicConfigs => topicConfigs._1 -> LogConfig.fromProps(defaultProps, topicConfigs._2)) // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 71e82f6..76e9ec0 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -36,10 +36,10 @@ import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer} * * 1. start a fetcher on every broker. * 2. each fetcher does the following - * 2.1 issues fetch request - * 2.2 puts the fetched result in a shared buffer - * 2.3 waits for all other fetchers to finish step 2.2 - * 2.4 one of the fetchers verifies the consistency of fetched results among replicas + * 2.1 issues fetch request + * 2.2 puts the fetched result in a shared buffer + * 2.3 waits for all other fetchers to finish step 2.2 + * 2.4 one of the fetchers verifies the consistency of fetched results among replicas * * The consistency verification is up to the high watermark. The tool reports the * max lag between the verified offset and the high watermark among all partitions. @@ -64,39 +64,39 @@ object ReplicaVerificationTool extends Logging { def main(args: Array[String]): Unit = { val parser = new OptionParser val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") - .withRequiredArg - .describedAs("hostname:port,...,hostname:port") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("hostname:port,...,hostname:port") + .ofType(classOf[String]) val fetchSizeOpt = parser.accepts("fetch-size", "The fetch size of each request.") - .withRequiredArg - .describedAs("bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.FetchSize) + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(ConsumerConfig.FetchSize) val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1000) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1000) val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.") - .withRequiredArg - .describedAs("Java regex (String)") - .ofType(classOf[String]) - .defaultsTo(".*") + .withRequiredArg + .describedAs("Java regex (String)") + .ofType(classOf[String]) + .defaultsTo(".*") val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.") - .withRequiredArg - .describedAs("timestamp/-1(latest)/-2(earliest)") - .ofType(classOf[java.lang.Long]) - .defaultsTo(-1L) + .withRequiredArg + .describedAs("timestamp/-1(latest)/-2(earliest)") + .ofType(classOf[java.lang.Long]) + .defaultsTo(-1L) val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Long]) - .defaultsTo(30 * 1000L) + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(30 * 1000L) - if (args.length == 0) + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") - val options = parser.parse(args: _*) + val options = parser.parse(args : _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) val regex = options.valueOf(topicWhiteListOpt) @@ -136,21 +136,24 @@ object ReplicaVerificationTool extends Logging { debug("Selected topic partitions: " + topicPartitionReplicaList) val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId) .map { case (brokerId, partitions) => - brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId)} - } + brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } } + debug("Topic partitions per broker: " + topicAndPartitionsPerBroker) val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] = topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) - .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size} + .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) + //Changing mapValues to map since the leadersPerBroker is used to create a ReplicaBuffer which + //is used to create replica fetcher threads. This change will avoid any inconsistency which may arise from the use of mapValues val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.map( partitionMetadata => (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)) ).groupBy(_._2) - .map {case (leaderId,topicAndPartitionAndLeaderIds) => leaderId -> topicAndPartitionAndLeaderIds.map{case(topicAndPartition, leaderId) => topicAndPartition}} + .map { case (leaderId, topicAndPartitionAndLeaderIds) => + leaderId -> topicAndPartitionAndLeaderIds.map { case (topicAndPartition, leaderId) => topicAndPartition } } debug("Leaders per broker: " + leadersPerBroker) val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, @@ -267,15 +270,15 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") val messageIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) => - replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator - } + replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator} + val maxHw = fetchResponsePerReplica.values.map(_.hw).max // Iterate one message at a time from every replica, until high watermark is reached. var isMessageInAllReplicas = true while (isMessageInAllReplicas) { var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None - for ((replicaId, messageIterator) <- messageIteratorMap) { + for ( (replicaId, messageIterator) <- messageIteratorMap) { try { if (messageIterator.hasNext) { val messageAndOffset = messageIterator.next() @@ -315,7 +318,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset fetchOffsetMap.put(topicAndPartition, nextOffset) debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " + - nextOffset + " for " + topicAndPartition) + nextOffset + " for " + topicAndPartition) } } if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) { diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 5e50bb2..bf18874 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -127,7 +127,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderIsrAndControllerEpoch] leaderAndIsr.put((topic, partitionId), new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2)) - val partitionStateInfo = leaderAndIsr.map{case(topicAndPartitionId, l) => topicAndPartitionId -> new PartitionStateInfo(l, Set(0,1))}.toMap + val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, Set(0,1))).toMap val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, staleControllerEpoch, 0, "") -- 1.9.3 (Apple Git-50) From 592ac287bcd31991fe11481a6621bd02fc81c43a Mon Sep 17 00:00:00 2001 From: mgharat Date: Fri, 29 Aug 2014 09:54:53 -0700 Subject: [PATCH 3/4] Removed the unnecessary import --- core/src/main/scala/kafka/server/DelayedFetch.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 898dd30..6fa3384 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.common import kafka.network.RequestChannel import kafka.api.{FetchResponse, FetchRequest} import kafka.common.{UnknownTopicOrPartitionException, NotLeaderForPartitionException, TopicAndPartition} -- 1.9.3 (Apple Git-50) From e80c75b11fd5453fe628c50e7c8b74c015b0133d Mon Sep 17 00:00:00 2001 From: mgharat Date: Wed, 3 Sep 2014 11:25:33 -0700 Subject: [PATCH 4/4] Made changes to comments as per the suggestions on the reviewboard --- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala | 2 +- core/src/main/scala/kafka/controller/KafkaController.scala | 2 +- core/src/main/scala/kafka/server/DelayedFetch.scala | 2 +- core/src/main/scala/kafka/server/DelayedProduce.scala | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++--- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala | 4 ++-- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 8eeb831..b3385e5 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -120,7 +120,7 @@ object ReassignPartitionsCommand extends Logging { private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) :Map[TopicAndPartition, ReassignmentStatus] = { - //Changing mapValues to map since this function returns a mapping of TopicAnPartition to ReassignmentStatus + //Create a new collection with map, since the function returns a mapping of TopicAndPartition to ReassignmentStatus //and ReassignmentStatus depends on partitionsBeingReassigned val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).map{ case (topicAndPartition, reassignedPartitionsContext) => topicAndPartition -> reassignedPartitionsContext.newReplicas } partitionsToBeReassigned.map { topicAndPartition => diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 5811c34..7b6117f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -929,7 +929,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // remove this partition from that list val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition // write the new list to zookeeper - //Changing the mapValues to map since any change in TopicAndPartition or newReplicas will make the writes inconsistent + //Create a new collection with map since any change in TopicAndPartition or newReplicas will make the writes inconsistent ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.map{ case (topicAndPartition, reassignedPartitionsContext) => topicAndPartition -> reassignedPartitionsContext.newReplicas }) // update the cache. NO-OP if the partition's reassignment was never started controllerContext.partitionsBeingReassigned.remove(topicAndPartition) diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 6fa3384..96d584f 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -86,7 +86,7 @@ class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], def respond(replicaManager: ReplicaManager): FetchResponse = { val topicData = replicaManager.readMessageSets(fetch) - //Changing the mapValues to map to avoid any changes to a constructed response due to changes in the topic data + //Create a new collection with map to avoid any changes to a constructed response due to changes in the topic data FetchResponse(fetch.correlationId, topicData.map{ case (topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.data }) } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 79138eb..ff4df53 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -57,7 +57,7 @@ class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], } def respond(offsetManager: OffsetManager): RequestOrResponse = { - //Changing mapValues to map since the responseStatus is used in construction of the ProducerResponse + //Create a new collection with map since responseStatus is used in construction of the ProducerResponse val responseStatus = partitionStatus .map{ case (topicAndPartition, delayedProduceResponseStatus) => topicAndPartition -> delayedProduceResponseStatus.responseStatus} diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1b2756e..1224530 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -303,7 +303,7 @@ class KafkaApis(val requestChannel: RequestChannel, // if the fetch request comes from the follower, // update its corresponding log end offset - //Changing mapValues to map to avoid any inconsistency when the log end offset is been updated + //Create a new collection with map to avoid any inconsistency when the log end offset is been updated if(fetchRequest.isFromFollower) recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.map{ case(topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.offset }) @@ -321,7 +321,7 @@ class KafkaApis(val requestChannel: RequestChannel, errorReadingData) { debug("Returning fetch response %s for fetch request with correlation id %d to client %s" .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) - //Changing mapValues to map since the (TopicAndPartition -> ResponsePartitionData) is used in FetchResponse for processing the response + //Create a new collection with map since the (TopicAndPartition -> ResponsePartitionData) is used in FetchResponse for processing the response val response = new FetchResponse(fetchRequest.correlationId, dataRead.map{ case(topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.data }) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { @@ -329,7 +329,7 @@ class KafkaApis(val requestChannel: RequestChannel, fetchRequest.clientId)) // create a list of (topic, partition) pairs to use as keys for this delayed request val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) - //Changing mapValues to map since the partitionOffsets are passed and used in DelayedFetch + //Create a new collection with map since the partitionOffsets are passed and used in DelayedFetch val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, dataRead.map{ case(topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.offset }) // add the fetch request for watch if it's not satisfied, otherwise send the response back val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 7359e55..43cb8c8 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -289,7 +289,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg minCleanableRatio = config.logCleanerMinCleanRatio, compact = config.logCleanupPolicy.trim.toLowerCase == "compact") val defaultProps = defaultLogConfig.toProps - //Changing from mapValues to map since configs is passed to LogManager and any changes in the properties may affect the LogManager + //Create a new collection with map since configs is passed to LogManager and any changes in the properties may affect the LogManager val configs = AdminUtils.fetchAllTopicConfigs(zkClient).map(topicConfigs => topicConfigs._1 -> LogConfig.fromProps(defaultProps, topicConfigs._2)) // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 76e9ec0..c007982 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -144,8 +144,8 @@ object ReplicaVerificationTool extends Logging { .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) - //Changing mapValues to map since the leadersPerBroker is used to create a ReplicaBuffer which - //is used to create replica fetcher threads. This change will avoid any inconsistency which may arise from the use of mapValues + //Create a new collection with map to avoid any inconsistency, since the leadersPerBroker is used to create a ReplicaBuffer which + //is used to create replica fetcher threads. val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.map( -- 1.9.3 (Apple Git-50)