From 63e1c7b2cb8133102b50aa1872696de0b46245c5 Mon Sep 17 00:00:00 2001 From: lvfangmin Date: Mon, 13 Apr 2015 17:40:27 +0800 Subject: [PATCH] Patch for KAFKA-2056: PartitionAssignorTest.testRangePartitionAssignor transient failure --- core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala index 1910fcb..9a6fd1a 100644 --- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -43,7 +43,7 @@ class PartitionAssignorTest extends JUnit3Suite with Logging { val topicPartitionCounts = Map((1 to topicCount).map(topic => { ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount))) }).toSeq:_*) - + val subscriptions = Map((1 to consumerCount).map(consumer => { val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1)) ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true)) @@ -170,6 +170,7 @@ private object PartitionAssignorTest extends Logging { // check for uniqueness (i.e., any partition should be assigned to exactly one consumer stream) val globalAssignment = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() assignments.foreach(assignment => { + if (assignment == null) return assignment.foreach { case(topicPartition, owner) => val previousOwnerOpt = globalAssignment.put(topicPartition, owner) assertTrue("Scenario %s: %s is assigned to two owners.".format(scenario, topicPartition), previousOwnerOpt.isEmpty) -- 2.3.2 (Apple Git-55)