From b10a5d6966b9b5d40a790f4fea91284fcc64c09e Mon Sep 17 00:00:00 2001 From: lvfangmin Date: Tue, 14 Apr 2015 17:05:03 +0800 Subject: [PATCH] Patch for KAFKA-2056: PartitionAssignorTest.testRangePartitionAssignor transient failure --- core/src/main/scala/kafka/consumer/PartitionAssignor.scala | 6 ++++++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 4 +--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index 4afda8b..849284a 100755 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -112,6 +112,9 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { assignmentForConsumer += (topicPartition -> threadId) }) } + + // assign Map.empty for the consumers which are not associated with topic partitions + ctx.consumers.foreach(consumerId => partitionAssignment.getAndMaybePut(consumerId)) partitionAssignment } } @@ -164,6 +167,9 @@ class RangeAssignor() extends PartitionAssignor with Logging { } } } + + // assign Map.empty for the consumers which are not associated with topic partitions + ctx.consumers.foreach(consumerId => partitionAssignment.getAndMaybePut(consumerId)) partitionAssignment } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e250b94..aa8d940 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -684,9 +684,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, releasePartitionOwnership(topicRegistry) val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val globalPartitionAssignment = partitionAssignor.assign(assignmentContext) - val partitionAssignment = Option(globalPartitionAssignment.get(assignmentContext.consumerId)).getOrElse( - mutable.HashMap.empty[TopicAndPartition, ConsumerThreadId] - ) + val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) -- 2.3.2 (Apple Git-55)