From 12eb01c4f57b9c4c3e89bb93bcdecc83c043bcaa Mon Sep 17 00:00:00 2001 From: mgharat Date: Thu, 4 Sep 2014 18:43:18 -0700 Subject: [PATCH 1/5] reapply patch --- .../kafka/admin/ReassignPartitionsCommand.scala | 4 +- .../scala/kafka/controller/KafkaController.scala | 3 +- .../src/main/scala/kafka/server/DelayedFetch.scala | 5 +- .../main/scala/kafka/server/DelayedProduce.scala | 4 +- core/src/main/scala/kafka/server/KafkaApis.scala | 11 +- core/src/main/scala/kafka/server/KafkaServer.scala | 3 +- .../kafka/tools/ReplicaVerificationTool.scala | 113 +++++++++++---------- .../unit/kafka/server/LeaderElectionTest.scala | 1 + 8 files changed, 79 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 691d69a..b3385e5 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -120,7 +120,9 @@ object ReassignPartitionsCommand extends Logging { private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) :Map[TopicAndPartition, ReassignmentStatus] = { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) + //Create a new collection with map, since the function returns a mapping of TopicAndPartition to ReassignmentStatus + //and ReassignmentStatus depends on partitionsBeingReassigned + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).map{ case (topicAndPartition, reassignedPartitionsContext) => topicAndPartition -> reassignedPartitionsContext.newReplicas } partitionsToBeReassigned.map { topicAndPartition => (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 8ab4a1b..7b6117f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -929,7 +929,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // remove this partition from that list val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition // write the new list to zookeeper - ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) + //Create a new collection with map since any change in TopicAndPartition or newReplicas will make the writes inconsistent + ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.map{ case (topicAndPartition, reassignedPartitionsContext) => topicAndPartition -> reassignedPartitionsContext.newReplicas }) // update the cache. NO-OP if the partition's reassignment was never started controllerContext.partitionsBeingReassigned.remove(topicAndPartition) } diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index e0f14e2..96d584f 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -63,7 +63,7 @@ class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], debug("Satisfying fetch request %s since it is fetching later segments of partition %s.".format(fetch, topicAndPartition)) return true } else if (fetchOffset.offsetOnOlderSegment(endOffset)) { - // Case C, this can happen when the folloer replica is lagging too much + // Case C, this can happen when the follower replica is lagging too much debug("Satisfying fetch request %s immediately since it is fetching older segments.".format(fetch)) return true } else if (fetchOffset.precedes(endOffset)) { @@ -86,6 +86,7 @@ class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], def respond(replicaManager: ReplicaManager): FetchResponse = { val topicData = replicaManager.readMessageSets(fetch) - FetchResponse(fetch.correlationId, topicData.mapValues(_.data)) + //Create a new collection with map to avoid any changes to a constructed response due to changes in the topic data + FetchResponse(fetch.correlationId, topicData.map{ case (topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.data }) } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 9481508..ff4df53 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -57,7 +57,9 @@ class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], } def respond(offsetManager: OffsetManager): RequestOrResponse = { - val responseStatus = partitionStatus.mapValues(status => status.responseStatus) + //Create a new collection with map since responseStatus is used in construction of the ProducerResponse + val responseStatus = partitionStatus + .map{ case (topicAndPartition, delayedProduceResponseStatus) => topicAndPartition -> delayedProduceResponseStatus.responseStatus} val errorCode = responseStatus.find { case (_, status) => status.error != ErrorMapping.NoError diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c584b55..1224530 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -303,8 +303,9 @@ class KafkaApis(val requestChannel: RequestChannel, // if the fetch request comes from the follower, // update its corresponding log end offset + //Create a new collection with map to avoid any inconsistency when the log end offset is been updated if(fetchRequest.isFromFollower) - recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset)) + recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.map{ case(topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.offset }) // check if this fetch request can be satisfied right away val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum @@ -320,16 +321,16 @@ class KafkaApis(val requestChannel: RequestChannel, errorReadingData) { debug("Returning fetch response %s for fetch request with correlation id %d to client %s" .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) - val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data)) + //Create a new collection with map since the (TopicAndPartition -> ResponsePartitionData) is used in FetchResponse for processing the response + val response = new FetchResponse(fetchRequest.correlationId, dataRead.map{ case(topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.data }) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, fetchRequest.clientId)) // create a list of (topic, partition) pairs to use as keys for this delayed request val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, - dataRead.mapValues(_.offset)) - + //Create a new collection with map since the partitionOffsets are passed and used in DelayedFetch + val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, dataRead.map{ case(topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.offset }) // add the fetch request for watch if it's not satisfied, otherwise send the response back val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) if (satisfiedByMe) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 3e9e91f..a127bf7 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -323,7 +323,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg minCleanableRatio = config.logCleanerMinCleanRatio, compact = config.logCleanupPolicy.trim.toLowerCase == "compact") val defaultProps = defaultLogConfig.toProps - val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) + //Create a new collection with map since configs is passed to LogManager and any changes in the properties may affect the LogManager + val configs = AdminUtils.fetchAllTopicConfigs(zkClient).map(topicConfigs => topicConfigs._1 -> LogConfig.fromProps(defaultProps, topicConfigs._2)) // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, dedupeBufferSize = config.logCleanerDedupeBufferSize, diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index ba6ddd7..3865661 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -32,20 +32,20 @@ import kafka.utils._ import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer} /** - * For verifying the consistency among replicas. + * For verifying the consistency among replicas. * - * 1. start a fetcher on every broker. - * 2. each fetcher does the following - * 2.1 issues fetch request - * 2.2 puts the fetched result in a shared buffer - * 2.3 waits for all other fetchers to finish step 2.2 - * 2.4 one of the fetchers verifies the consistency of fetched results among replicas + * 1. start a fetcher on every broker. + * 2. each fetcher does the following + * 2.1 issues fetch request + * 2.2 puts the fetched result in a shared buffer + * 2.3 waits for all other fetchers to finish step 2.2 + * 2.4 one of the fetchers verifies the consistency of fetched results among replicas * - * The consistency verification is up to the high watermark. The tool reports the - * max lag between the verified offset and the high watermark among all partitions. + * The consistency verification is up to the high watermark. The tool reports the + * max lag between the verified offset and the high watermark among all partitions. * - * If a broker goes down, the verification of the partitions on that broker is delayed - * until the broker is up again. + * If a broker goes down, the verification of the partitions on that broker is delayed + * until the broker is up again. * * Caveats: * 1. The tools needs all brokers to be up at startup time. @@ -53,7 +53,7 @@ import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer} */ object ReplicaVerificationTool extends Logging { - val clientId= "replicaVerificationTool" + val clientId = "replicaVerificationTool" val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS" val dateFormat = new SimpleDateFormat(dateFormatString) @@ -83,17 +83,17 @@ object ReplicaVerificationTool extends Logging { .ofType(classOf[String]) .defaultsTo(".*") val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.") - .withRequiredArg - .describedAs("timestamp/-1(latest)/-2(earliest)") - .ofType(classOf[java.lang.Long]) - .defaultsTo(-1L) + .withRequiredArg + .describedAs("timestamp/-1(latest)/-2(earliest)") + .ofType(classOf[java.lang.Long]) + .defaultsTo(-1L) val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.") .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Long]) .defaultsTo(30 * 1000L) - - if(args.length == 0) + + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") val options = parser.parse(args : _*) @@ -122,10 +122,10 @@ object ReplicaVerificationTool extends Logging { val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( - topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) - true - else - false + topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) + true + else + false ) val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap( topicMetadataResponse => @@ -138,42 +138,46 @@ object ReplicaVerificationTool extends Logging { debug("Selected topic partitions: " + topicPartitionReplicaList) val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId) .map { case (brokerId, partitions) => - brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } } + brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } } + debug("Topic partitions per broker: " + topicAndPartitionsPerBroker) val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] = - topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) - .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } + topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica.topic, replica.partitionId)) + .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) + + //Create a new collection with map to avoid any inconsistency, since the leadersPerBroker is used to create a ReplicaBuffer which + //is used to create replica fetcher threads. val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.map( partitionMetadata => (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)) ).groupBy(_._2) - .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { - case(topicAndPartition, leaderId) => topicAndPartition }) + .map { case (leaderId, topicAndPartitionAndLeaderIds) => + leaderId -> topicAndPartitionAndLeaderIds.map { case (topicAndPartition, leaderId) => topicAndPartition } } debug("Leaders per broker: " + leadersPerBroker) val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, - leadersPerBroker, - topicAndPartitionsPerBroker.size, - brokerMap, - initialOffsetTime, - reportInterval) + leadersPerBroker, + topicAndPartitionsPerBroker.size, + brokerMap, + initialOffsetTime, + reportInterval) // create all replica fetcher threads val verificationBrokerId = topicAndPartitionsPerBroker.head._1 val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map { case (brokerId, topicAndPartitions) => new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId, - sourceBroker = brokerMap(brokerId), - topicAndPartitions = topicAndPartitions, - replicaBuffer = replicaBuffer, - socketTimeout = 30000, - socketBufferSize = 256000, - fetchSize = fetchSize, - maxWait = maxWaitMs, - minBytes = 1, - doVerification = (brokerId == verificationBrokerId)) + sourceBroker = brokerMap(brokerId), + topicAndPartitions = topicAndPartitions, + replicaBuffer = replicaBuffer, + socketTimeout = 30000, + socketBufferSize = 256000, + fetchSize = fetchSize, + maxWait = maxWaitMs, + minBytes = 1, + doVerification = (brokerId == verificationBrokerId)) } Runtime.getRuntime.addShutdownHook(new Thread() { @@ -188,7 +192,7 @@ object ReplicaVerificationTool extends Logging { } } -private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) +private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) private case class ReplicaAndMessageIterator(replicaId: Int, iterator: Iterator[MessageAndOffset]) @@ -243,7 +247,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa val offsetRequest = OffsetRequest(initialOffsetMap) val offsetResponse = consumer.getOffsetsBefore(offsetRequest) assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse)) - offsetResponse.partitionErrorAndOffsets.foreach{ + offsetResponse.partitionErrorAndOffsets.foreach { case (topicAndPartition, partitionOffsetResponse) => fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head) } @@ -264,11 +268,12 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) { debug("Verifying " + topicAndPartition) assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition), - "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " - + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") + "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " + + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") val messageIteratorMap = fetchResponsePerReplica.map { - case(replicaId, fetchResponse) => + case (replicaId, fetchResponse) => replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator} + val maxHw = fetchResponsePerReplica.values.map(_.hw).max // Iterate one message at a time from every replica, until high watermark is reached. @@ -287,7 +292,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa messageInfoFromFirstReplicaOpt match { case None => messageInfoFromFirstReplicaOpt = Some( - MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum)) + MessageInfo(replicaId, messageAndOffset.offset, messageAndOffset.nextOffset, messageAndOffset.message.checksum)) case Some(messageInfoFromFirstReplica) => if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) { println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition @@ -308,7 +313,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } catch { case t: Throwable => throw new RuntimeException("Error in processing replica %d in partition %s at offset %d." - .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) + .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) } } if (isMessageInAllReplicas) { @@ -341,10 +346,10 @@ private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartiti extends ShutdownableThread(name) { val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId) val fetchRequestBuilder = new FetchRequestBuilder(). - clientId(ReplicaVerificationTool.clientId). - replicaId(Request.DebuggingConsumerId). - maxWait(maxWait). - minBytes(minBytes) + clientId(ReplicaVerificationTool.clientId). + replicaId(Request.DebuggingConsumerId). + maxWait(maxWait). + minBytes(minBytes) override def doWork() { @@ -369,7 +374,7 @@ private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartiti if (response != null) { response.data.foreach { - case(topicAndPartition, partitionData) => + case (topicAndPartition, partitionData) => replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData) } } else { diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index c2ba07c..bf18874 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -128,6 +128,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { leaderAndIsr.put((topic, partitionId), new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2)) val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, Set(0,1))).toMap + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, staleControllerEpoch, 0, "") -- 1.9.3 (Apple Git-50) From 32ec2f99bf1bd826aa004180e4b304125dc6f784 Mon Sep 17 00:00:00 2001 From: mgharat Date: Tue, 16 Sep 2014 11:31:44 -0700 Subject: [PATCH 2/5] Reverting the changes and adding comments to make the usage of mapValues more clear --- .../main/scala/kafka/admin/ReassignPartitionsCommand.scala | 5 ++--- core/src/main/scala/kafka/controller/KafkaController.scala | 4 ++-- core/src/main/scala/kafka/server/DelayedFetch.scala | 4 ++-- core/src/main/scala/kafka/server/DelayedProduce.scala | 5 ++--- core/src/main/scala/kafka/server/KafkaApis.scala | 12 ++++++------ core/src/main/scala/kafka/server/KafkaServer.scala | 4 ++-- .../src/main/scala/kafka/tools/ReplicaVerificationTool.scala | 9 ++++----- 7 files changed, 20 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index b3385e5..1154b29 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -120,9 +120,8 @@ object ReassignPartitionsCommand extends Logging { private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) :Map[TopicAndPartition, ReassignmentStatus] = { - //Create a new collection with map, since the function returns a mapping of TopicAndPartition to ReassignmentStatus - //and ReassignmentStatus depends on partitionsBeingReassigned - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).map{ case (topicAndPartition, reassignedPartitionsContext) => topicAndPartition -> reassignedPartitionsContext.newReplicas } + //creating a view of partitions being reassigned to be passed to checkIfPartitionReassignmentSucceeded + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) partitionsToBeReassigned.map { topicAndPartition => (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 7b6117f..7f4841e 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -929,8 +929,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // remove this partition from that list val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition // write the new list to zookeeper - //Create a new collection with map since any change in TopicAndPartition or newReplicas will make the writes inconsistent - ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.map{ case (topicAndPartition, reassignedPartitionsContext) => topicAndPartition -> reassignedPartitionsContext.newReplicas }) + //creating a view of updatedPartitionsBeingReassigned to be passed to updatePartitionReassignmentData in ZkUtils + ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) // update the cache. NO-OP if the partition's reassignment was never started controllerContext.partitionsBeingReassigned.remove(topicAndPartition) } diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 96d584f..88a8017 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -86,7 +86,7 @@ class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], def respond(replicaManager: ReplicaManager): FetchResponse = { val topicData = replicaManager.readMessageSets(fetch) - //Create a new collection with map to avoid any changes to a constructed response due to changes in the topic data - FetchResponse(fetch.correlationId, topicData.map{ case (topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.data }) + //creating a view of topicData for creating a FetchResponse + FetchResponse(fetch.correlationId, topicData.mapValues(_.data)) } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index ff4df53..34bcbcc 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -57,9 +57,8 @@ class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], } def respond(offsetManager: OffsetManager): RequestOrResponse = { - //Create a new collection with map since responseStatus is used in construction of the ProducerResponse - val responseStatus = partitionStatus - .map{ case (topicAndPartition, delayedProduceResponseStatus) => topicAndPartition -> delayedProduceResponseStatus.responseStatus} + //creating a view of (TopicAndPartition -> responseStatus) from partitionStatus, used for constructing the ProducerResponse + val responseStatus = partitionStatus.mapValues(status => status.responseStatus) val errorCode = responseStatus.find { case (_, status) => status.error != ErrorMapping.NoError diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1224530..b6a26f9 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -303,9 +303,9 @@ class KafkaApis(val requestChannel: RequestChannel, // if the fetch request comes from the follower, // update its corresponding log end offset - //Create a new collection with map to avoid any inconsistency when the log end offset is been updated + //creating a view of (TopicAndPartition -> offset) from dataRead to be passed to recordFollowerLogEndOffsets if(fetchRequest.isFromFollower) - recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.map{ case(topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.offset }) + recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset)) // check if this fetch request can be satisfied right away val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum @@ -321,16 +321,16 @@ class KafkaApis(val requestChannel: RequestChannel, errorReadingData) { debug("Returning fetch response %s for fetch request with correlation id %d to client %s" .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) - //Create a new collection with map since the (TopicAndPartition -> ResponsePartitionData) is used in FetchResponse for processing the response - val response = new FetchResponse(fetchRequest.correlationId, dataRead.map{ case(topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.data }) + //creating a view of (TopicAndPartition -> data) from dataRead for creating a fetch response + val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data)) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, fetchRequest.clientId)) // create a list of (topic, partition) pairs to use as keys for this delayed request val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) - //Create a new collection with map since the partitionOffsets are passed and used in DelayedFetch - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, dataRead.map{ case(topicAndPartition, partitionDataAndOffset) => topicAndPartition -> partitionDataAndOffset.offset }) + //creating a view of (TopicAndPartition -> offset) from dataRead for constructing a DelayedFetch request + val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, dataRead.mapValues(_.offset)) // add the fetch request for watch if it's not satisfied, otherwise send the response back val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) if (satisfiedByMe) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index a127bf7..6d7da4d 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -323,8 +323,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg minCleanableRatio = config.logCleanerMinCleanRatio, compact = config.logCleanupPolicy.trim.toLowerCase == "compact") val defaultProps = defaultLogConfig.toProps - //Create a new collection with map since configs is passed to LogManager and any changes in the properties may affect the LogManager - val configs = AdminUtils.fetchAllTopicConfigs(zkClient).map(topicConfigs => topicConfigs._1 -> LogConfig.fromProps(defaultProps, topicConfigs._2)) + //creating a view of topic configs used for creating LogManager + val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps,_)) // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, dedupeBufferSize = config.logCleanerDedupeBufferSize, diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 3865661..766ab3b 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -138,7 +138,7 @@ object ReplicaVerificationTool extends Logging { debug("Selected topic partitions: " + topicPartitionReplicaList) val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId) .map { case (brokerId, partitions) => - brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } } + brokerId -> partitions.map { case partition => new TopicAndPartition(partition.topic, partition.partitionId) } } debug("Topic partitions per broker: " + topicAndPartitionsPerBroker) val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] = @@ -146,16 +146,15 @@ object ReplicaVerificationTool extends Logging { .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) - //Create a new collection with map to avoid any inconsistency, since the leadersPerBroker is used to create a ReplicaBuffer which - //is used to create replica fetcher threads. + //creating a view of leadersPerBroker(LeaderId -> Seq[TopicAndPartition]) for creating a ReplicaBuffer val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.map( partitionMetadata => (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)) ).groupBy(_._2) - .map { case (leaderId, topicAndPartitionAndLeaderIds) => - leaderId -> topicAndPartitionAndLeaderIds.map { case (topicAndPartition, leaderId) => topicAndPartition } } + .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { + case(topicAndPartition, leaderId) => topicAndPartition }) debug("Leaders per broker: " + leadersPerBroker) val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, -- 1.9.3 (Apple Git-50) From a01dba522cb25b3c183ba0c93d7a3eec5e3deb07 Mon Sep 17 00:00:00 2001 From: mgharat Date: Tue, 16 Sep 2014 15:23:11 -0700 Subject: [PATCH 3/5] Formatted the comments --- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala | 2 +- core/src/main/scala/kafka/controller/KafkaController.scala | 2 +- core/src/main/scala/kafka/server/DelayedFetch.scala | 2 +- core/src/main/scala/kafka/server/DelayedProduce.scala | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++--- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala | 2 +- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 1154b29..a6d42c6 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -120,7 +120,7 @@ object ReassignPartitionsCommand extends Logging { private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) :Map[TopicAndPartition, ReassignmentStatus] = { - //creating a view of partitions being reassigned to be passed to checkIfPartitionReassignmentSucceeded + // creating a view of partitions being reassigned to be passed to checkIfPartitionReassignmentSucceeded val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) partitionsToBeReassigned.map { topicAndPartition => (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 7f4841e..41c5f00 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -929,7 +929,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // remove this partition from that list val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition // write the new list to zookeeper - //creating a view of updatedPartitionsBeingReassigned to be passed to updatePartitionReassignmentData in ZkUtils + // creating a view of updatedPartitionsBeingReassigned to be passed to updatePartitionReassignmentData in ZkUtils ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) // update the cache. NO-OP if the partition's reassignment was never started controllerContext.partitionsBeingReassigned.remove(topicAndPartition) diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 88a8017..5677084 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -86,7 +86,7 @@ class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], def respond(replicaManager: ReplicaManager): FetchResponse = { val topicData = replicaManager.readMessageSets(fetch) - //creating a view of topicData for creating a FetchResponse + // creating a view of topicData for creating a FetchResponse FetchResponse(fetch.correlationId, topicData.mapValues(_.data)) } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 34bcbcc..f613915 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -57,7 +57,7 @@ class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], } def respond(offsetManager: OffsetManager): RequestOrResponse = { - //creating a view of (TopicAndPartition -> responseStatus) from partitionStatus, used for constructing the ProducerResponse + // creating a responseStatusView from partitionStatus, used for constructing the ProducerResponse val responseStatus = partitionStatus.mapValues(status => status.responseStatus) val errorCode = responseStatus.find { case (_, status) => diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b6a26f9..7d5426a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -303,7 +303,7 @@ class KafkaApis(val requestChannel: RequestChannel, // if the fetch request comes from the follower, // update its corresponding log end offset - //creating a view of (TopicAndPartition -> offset) from dataRead to be passed to recordFollowerLogEndOffsets + // creating a view of (TopicAndPartition -> offset) from dataRead to be passed to recordFollowerLogEndOffsets if(fetchRequest.isFromFollower) recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset)) @@ -321,7 +321,7 @@ class KafkaApis(val requestChannel: RequestChannel, errorReadingData) { debug("Returning fetch response %s for fetch request with correlation id %d to client %s" .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) - //creating a view of (TopicAndPartition -> data) from dataRead for creating a fetch response + // creating a view of (TopicAndPartition -> data) from dataRead for creating a fetch response val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data)) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { @@ -329,7 +329,7 @@ class KafkaApis(val requestChannel: RequestChannel, fetchRequest.clientId)) // create a list of (topic, partition) pairs to use as keys for this delayed request val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) - //creating a view of (TopicAndPartition -> offset) from dataRead for constructing a DelayedFetch request + // creating a view of (TopicAndPartition -> offset) from dataRead for constructing a DelayedFetch request val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, dataRead.mapValues(_.offset)) // add the fetch request for watch if it's not satisfied, otherwise send the response back val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 6d7da4d..8315d1c 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -323,7 +323,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg minCleanableRatio = config.logCleanerMinCleanRatio, compact = config.logCleanupPolicy.trim.toLowerCase == "compact") val defaultProps = defaultLogConfig.toProps - //creating a view of topic configs used for creating LogManager + // creating a configsView, used for creating LogManager val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps,_)) // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 766ab3b..7e4e66e 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -146,7 +146,7 @@ object ReplicaVerificationTool extends Logging { .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) - //creating a view of leadersPerBroker(LeaderId -> Seq[TopicAndPartition]) for creating a ReplicaBuffer + // creating a leadersPerBrokerView(LeaderId -> Seq[TopicAndPartition]) for creating a ReplicaBuffer val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.map( -- 1.9.3 (Apple Git-50) From 835fa3b4f901a1337723d9e2e36ce5a633d14d35 Mon Sep 17 00:00:00 2001 From: mgharat Date: Tue, 30 Sep 2014 23:17:17 -0700 Subject: [PATCH 4/5] Removed comments and changed variable names as per the reviews --- .../main/scala/kafka/admin/ReassignPartitionsCommand.scala | 5 ++--- core/src/main/scala/kafka/server/DelayedProduce.scala | 7 +++---- core/src/main/scala/kafka/server/KafkaApis.scala | 12 +++++------- core/src/main/scala/kafka/server/KafkaServer.scala | 5 ++--- 4 files changed, 12 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index a6d42c6..78f268d 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -120,11 +120,10 @@ object ReassignPartitionsCommand extends Logging { private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) :Map[TopicAndPartition, ReassignmentStatus] = { - // creating a view of partitions being reassigned to be passed to checkIfPartitionReassignmentSucceeded - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) + val partitionsBeingReassignedView = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) partitionsToBeReassigned.map { topicAndPartition => (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, - topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) + topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassignedView)) } } diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index f613915..cfc07f6 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -57,10 +57,9 @@ class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], } def respond(offsetManager: OffsetManager): RequestOrResponse = { - // creating a responseStatusView from partitionStatus, used for constructing the ProducerResponse - val responseStatus = partitionStatus.mapValues(status => status.responseStatus) + val responseStatusView = partitionStatus.mapValues(status => status.responseStatus) - val errorCode = responseStatus.find { case (_, status) => + val errorCode = responseStatusView.find { case (_, status) => status.error != ErrorMapping.NoError }.map(_._2.error).getOrElse(ErrorMapping.NoError) @@ -69,7 +68,7 @@ class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], } val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize)) - .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) + .getOrElse(ProducerResponse(produce.correlationId, responseStatusView)) response } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7d5426a..67b3827 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -321,20 +321,18 @@ class KafkaApis(val requestChannel: RequestChannel, errorReadingData) { debug("Returning fetch response %s for fetch request with correlation id %d to client %s" .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) - // creating a view of (TopicAndPartition -> data) from dataRead for creating a fetch response - val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data)) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) + val responseView = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data)) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(responseView))) } else { debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, fetchRequest.clientId)) // create a list of (topic, partition) pairs to use as keys for this delayed request val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) - // creating a view of (TopicAndPartition -> offset) from dataRead for constructing a DelayedFetch request - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, dataRead.mapValues(_.offset)) + val delayedFetchView = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, dataRead.mapValues(_.offset)) // add the fetch request for watch if it's not satisfied, otherwise send the response back - val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) + val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetchView) if (satisfiedByMe) - fetchRequestPurgatory.respond(delayedFetch) + fetchRequestPurgatory.respond(delayedFetchView) } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 8315d1c..0e4e1ac 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -323,8 +323,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg minCleanableRatio = config.logCleanerMinCleanRatio, compact = config.logCleanupPolicy.trim.toLowerCase == "compact") val defaultProps = defaultLogConfig.toProps - // creating a configsView, used for creating LogManager - val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps,_)) + val configsView = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps,_)) // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, dedupeBufferSize = config.logCleanerDedupeBufferSize, @@ -335,7 +334,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg backOffMs = config.logCleanerBackoffMs, enableCleaner = config.logCleanerEnable) new LogManager(logDirs = config.logDirs.map(new File(_)).toArray, - topicConfigs = configs, + topicConfigs = configsView, defaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, ioThreads = config.numRecoveryThreadsPerDataDir, -- 1.9.3 (Apple Git-50) From a9b6125fe08797aa9788175dc93bacd713109515 Mon Sep 17 00:00:00 2001 From: mgharat Date: Tue, 30 Sep 2014 23:18:30 -0700 Subject: [PATCH 5/5] Removed comments and changed variable names as per the reviews --- core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 7e4e66e..f02f912 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -147,7 +147,7 @@ object ReplicaVerificationTool extends Logging { debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) // creating a leadersPerBrokerView(LeaderId -> Seq[TopicAndPartition]) for creating a ReplicaBuffer - val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( + val leadersPerBrokerView: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( topicMetadataResponse => topicMetadataResponse.partitionsMetadata.map( partitionMetadata => @@ -155,10 +155,10 @@ object ReplicaVerificationTool extends Logging { ).groupBy(_._2) .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { case(topicAndPartition, leaderId) => topicAndPartition }) - debug("Leaders per broker: " + leadersPerBroker) + debug("Leaders per broker: " + leadersPerBrokerView) val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, - leadersPerBroker, + leadersPerBrokerView, topicAndPartitionsPerBroker.size, brokerMap, initialOffsetTime, -- 1.9.3 (Apple Git-50)