From 6b0d6c0a58867ba239482ee61fdc9e8544ceee95 Mon Sep 17 00:00:00 2001 From: Igor Maravic Date: Thu, 7 May 2015 10:54:10 +0200 Subject: [PATCH] KAFKA-2176 DefaultPartitioner now produces consistent hashes for Arrays --- .../scala/kafka/producer/DefaultPartitioner.scala | 9 +- .../kafka/producer/DefaultPartitionerTest.scala | 105 +++++++++++++++++++++ 2 files changed, 111 insertions(+), 3 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/producer/DefaultPartitionerTest.scala diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala index 1141ed1..3cfb892 100755 --- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala +++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala @@ -22,9 +22,12 @@ import kafka.utils._ import org.apache.kafka.common.utils.Utils class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner { - private val random = new java.util.Random - def partition(key: Any, numPartitions: Int): Int = { - Utils.abs(key.hashCode) % numPartitions + deepHashCode(key) % numPartitions + } + + private def deepHashCode(key: Any): Int = key match { + case key: Array[_] => key.foldLeft(1)(31 * _ + deepHashCode(_)) + case _ => Utils.abs(key.##) } } diff --git a/core/src/test/scala/unit/kafka/producer/DefaultPartitionerTest.scala b/core/src/test/scala/unit/kafka/producer/DefaultPartitionerTest.scala new file mode 100644 index 0000000..6868cb2 --- /dev/null +++ b/core/src/test/scala/unit/kafka/producer/DefaultPartitionerTest.scala @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.producer + +import junit.framework.Assert._ +import kafka.producer.DefaultPartitioner +import kafka.utils.Logging +import org.junit.Test +import org.scalatest.junit.JUnit3Suite + +class DefaultPartitionerTest extends JUnit3Suite with Logging { + val defaultPartitioner = new DefaultPartitioner() + val numPartitions = 10000 + + @Test + def testPartitioningEqualStringsGetAssignedToSamePartition() { + val string1 = "string" + val string2 = "string" + + assertEquals(defaultPartitioner.partition(string1, numPartitions), + defaultPartitioner.partition(string2, numPartitions)) + } + + @Test + def testPartitioningNonEqualStringsDoNotGetAssignedToSamePartition() { + val string1 = "string_1" + val string2 = "string_2" + + assertTrue(defaultPartitioner.partition(string1, numPartitions) != + defaultPartitioner.partition(string2, numPartitions)) + } + + @Test + def testPartitioningByteArraysWithEqualContentGetAssignedToSamePartition() { + val byteArray1 = "string".getBytes("UTF-8") + val byteArray2 = "string".getBytes("UTF-8") + + assertEquals(defaultPartitioner.partition(byteArray1, numPartitions), + defaultPartitioner.partition(byteArray2, numPartitions)) + } + + @Test + def testPartitioningByteArraysWithNonEqualContentDoNotGetAssignedToSamePartition() { + val byteArray1 = "string_1".getBytes("UTF-8") + val byteArray2 = "string_2".getBytes("UTF-8") + + assertTrue(defaultPartitioner.partition(byteArray1, numPartitions) != + defaultPartitioner.partition(byteArray2, numPartitions)) + } + + @Test + def testPartitioningByteListsWithEqualContentGetAssignedToSamePartition() { + val byteList1 = "string".getBytes("UTF-8").toList + val byteList2 = "string".getBytes("UTF-8").toList + + assertEquals(defaultPartitioner.partition(byteList1, numPartitions), + defaultPartitioner.partition(byteList2, numPartitions)) + } + + @Test + def testPartitioningByteListsWithNonEqualContentDoNotGetAssignedToSamePartition() { + val byteList1 = "string_1".getBytes("UTF-8").toList + val byteList2 = "string_2".getBytes("UTF-8").toList + + assertTrue(defaultPartitioner.partition(byteList1, numPartitions) != + defaultPartitioner.partition(byteList2, numPartitions)) + } + + @Test + def testPartitioningArrayOfByteArraysWithEqualContentGetAssignedToSamePartition() { + val array1 = Array("string".getBytes("UTF-8"), + "string".getBytes("UTF-8")) + val array2 = Array("string".getBytes("UTF-8"), + "string".getBytes("UTF-8")) + + assertEquals(defaultPartitioner.partition(array1, numPartitions), + defaultPartitioner.partition(array2, numPartitions)) + } + + @Test + def testPartitioningArrayOfByteArraysWithNonEqualContentDoNotGetAssignedToSamePartition() { + val array1 = Array("string_1".getBytes("UTF-8"), + "string_1".getBytes("UTF-8")) + val array2 = Array("string_2".getBytes("UTF-8"), + "string_2".getBytes("UTF-8")) + + assertTrue(defaultPartitioner.partition(array1, numPartitions) != + defaultPartitioner.partition(array2, numPartitions)) + } +} -- 2.3.3