From 44fee3d7de124be49aaf9927bfd23f40c39180df Mon Sep 17 00:00:00 2001 From: Gabriel Reid Date: Tue, 26 May 2015 21:24:40 +0200 Subject: [PATCH] KAFKA-2223 Hash smearing in hash-based partitioner Apply hash "smearing" in hash-based partitioners to ensure a better distribution of keys over partitions. --- .../java/org/apache/kafka/common/utils/Utils.java | 19 ++++++++++ .../kafka/producer/ByteArrayPartitioner.scala | 2 +- .../scala/kafka/producer/DefaultPartitioner.scala | 2 +- .../kafka/producer/DefaultPartitionerTest.scala | 41 ++++++++++++++++++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 3 +- 5 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/producer/DefaultPartitionerTest.scala diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index f73eedb..9d7a00f 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -479,4 +479,23 @@ public class Utils { public static String readFileAsString(String path) throws IOException { return Utils.readFileAsString(path, Charset.defaultCharset()); } + + /** + * Applies a supplemental hashing function to an integer, increasing variability in lower-order bits. + * This method is intended to avoid collisions in functions which rely on variance in the lower bits of a hash + * code (e.g. hash partitioning). + * @param hashCode the hash code to be smeared + * @return the smeared hash code, with higher-order bits smeared into the lower-order bits + */ + // The following comments and code are taken directly from Guava's com.google.common.collect.Hashing class + // This method was written by Doug Lea with assistance from members of JCP + // JSR-166 Expert Group and released to the public domain, as explained at + // http://creativecommons.org/licenses/publicdomain + // + // As of 2010/06/11, this method is identical to the (package private) hash + // method in OpenJDK 7's java.util.HashMap class. + public static int smearHash(int hashCode) { + hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12); + return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4); + } } diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala index e6b100e..35284fc 100755 --- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala +++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala @@ -23,6 +23,6 @@ import org.apache.kafka.common.utils.Utils class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner { def partition(key: Any, numPartitions: Int): Int = { - Utils.abs(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]])) % numPartitions + Utils.abs(Utils.smearHash(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]]))) % numPartitions } } diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala index 1141ed1..3717ba0 100755 --- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala +++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala @@ -25,6 +25,6 @@ class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner private val random = new java.util.Random def partition(key: Any, numPartitions: Int): Int = { - Utils.abs(key.hashCode) % numPartitions + Utils.abs(Utils.smearHash(key.hashCode)) % numPartitions } } diff --git a/core/src/test/scala/unit/kafka/producer/DefaultPartitionerTest.scala b/core/src/test/scala/unit/kafka/producer/DefaultPartitionerTest.scala new file mode 100644 index 0000000..0c988ca --- /dev/null +++ b/core/src/test/scala/unit/kafka/producer/DefaultPartitionerTest.scala @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.producer + +import kafka.producer.DefaultPartitioner +import org.junit.Test +import org.junit.Assert.assertTrue + +class DefaultPartitionerTest { + + @Test + def testPartition() { + val partitioner = new DefaultPartitioner(null) + val partitions = (0 to 1000 by 10).map(partitioner.partition(_, 10)) + val countsPerPartition = partitions.groupBy(identity).mapValues(_.size) + + // Check that each partition has received at least 8 entries (perfectly balanced would be 10 + // entries per partition). This is relevant here because the lower-order bits the hash of each + // key are the same, so this test passing is reliant on the hash smearing + for (partition <- (0 to 9)) { + assertTrue("Partition " + partition + " has less than 8 entries", + countsPerPartition.getOrElse(partition, 0) >= 8) + } + } + +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 17e9fe4..2f0dd27 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -25,6 +25,7 @@ import java.util.Properties import charset.Charset import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils._ import collection.mutable.ListBuffer @@ -887,7 +888,7 @@ class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner{ class HashPartitioner(props: VerifiableProperties = null) extends Partitioner { def partition(data: Any, numPartitions: Int): Int = { - (data.hashCode % numPartitions) + Utils.abs(Utils.smearHash(data.hashCode)) % numPartitions } } -- 2.3.2 (Apple Git-55)