diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala index 9bffeb6..df1a66f 100644 --- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala +++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala @@ -20,10 +20,10 @@ package kafka.producer import kafka.utils._ -private class DefaultPartitioner[T](props: VerifiableProperties = null) extends Partitioner[T] { +private class DefaultPartitioner[T](props: VerifiableProperties = null) extends Partitioner { private val random = new java.util.Random - def partition(key: T, numPartitions: Int): Int = { + def partition(key: Any, numPartitions: Int): Int = { Utils.abs(key.hashCode) % numPartitions } } diff --git a/core/src/main/scala/kafka/producer/KeyedMessage.scala b/core/src/main/scala/kafka/producer/KeyedMessage.scala index b13c4ec..388bc9b 100644 --- a/core/src/main/scala/kafka/producer/KeyedMessage.scala +++ b/core/src/main/scala/kafka/producer/KeyedMessage.scala @@ -18,13 +18,25 @@ package kafka.producer /** - * A topic, key, and value + * A topic, key, and value. + * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored. */ -case class KeyedMessage[K, V](val topic: String, val key: K, val message: V) { +case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) { if(topic == null) throw new IllegalArgumentException("Topic cannot be null.") - def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], message) + def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message) + + def this(topic: String, key: K, message: V) = this(topic, key, key, message) + + def partitionKey = { + if(partKey != null) + partKey + else if(hasKey) + key + else + null + } def hasKey = key != null } \ No newline at end of file diff --git a/core/src/main/scala/kafka/producer/Partitioner.scala b/core/src/main/scala/kafka/producer/Partitioner.scala index 9ee61c7..efe6d6d 100644 --- a/core/src/main/scala/kafka/producer/Partitioner.scala +++ b/core/src/main/scala/kafka/producer/Partitioner.scala @@ -23,11 +23,11 @@ package kafka.producer * Implementations will be constructed via reflection and are required to have a constructor that takes a single * VerifiableProperties instance--this allows passing configuration properties into the partitioner implementation. */ -trait Partitioner[T] { +trait Partitioner { /** * Uses the key to calculate a partition bucket id for routing * the data to the appropriate broker partition * @return an integer between 0 and numPartitions-1 */ - def partition(key: T, numPartitions: Int): Int + def partition(key: Any, numPartitions: Int): Int } diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index c837091..b38480b 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -56,7 +56,7 @@ class Producer[K,V](val config: ProducerConfig, def this(config: ProducerConfig) = this(config, new DefaultEventHandler[K,V](config, - Utils.createObject[Partitioner[K]](config.partitionerClass, config.props), + Utils.createObject[Partitioner](config.partitionerClass, config.props), Utils.createObject[Encoder[V]](config.serializerClass, config.props), Utils.createObject[Encoder[K]](config.keySerializerClass, config.props), new ProducerPool(config))) diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 2e3e383..5d6e7de 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -28,7 +28,7 @@ import java.util.concurrent.atomic._ import kafka.api.{TopicMetadata, ProducerRequest} class DefaultEventHandler[K,V](config: ProducerConfig, - private val partitioner: Partitioner[K], + private val partitioner: Partitioner, private val encoder: Encoder[V], private val keyEncoder: Encoder[K], private val producerPool: ProducerPool, @@ -124,9 +124,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig, events.map{e => try { if(e.hasKey) - serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = e.key, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message))) + serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message))) else - serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message))) + serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(bytes = encoder.toBytes(e.message))) } catch { case t => producerStats.serializationErrorRate.mark() @@ -147,7 +147,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, try { for (message <- messages) { val topicPartitionsList = getPartitionListForTopic(message) - val partitionIndex = getPartition(message.key, topicPartitionsList) + val partitionIndex = getPartition(message.partitionKey, topicPartitionsList) val brokerPartition = topicPartitionsList(partitionIndex) // postpone the failure until the send operation, so that requests for other brokers are handled correctly @@ -198,7 +198,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, * @param topicPartitionList the list of available partitions * @return the partition id */ - private def getPartition(key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = { + private def getPartition(key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = { val numPartitions = topicPartitionList.size if(numPartitions <= 0) throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions + diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala index 0bef218..22b16e5 100644 --- a/core/src/test/scala/other/kafka/TestLogCleaning.scala +++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala @@ -260,9 +260,9 @@ object TestLogCleaning { val delete = i % 100 < percentDeletes val msg = if(delete) - KeyedMessage[String, String](topic = topic, key = key.toString, message = null) + new KeyedMessage[String, String](topic = topic, key = key.toString, message = null) else - KeyedMessage[String, String](topic = topic, key = key.toString, message = i.toString) + new KeyedMessage[String, String](topic = topic, key = key.toString, message = i.toString) producer.send(msg) producedWriter.write(TestRecord(topic, key, i, delete).toString) producedWriter.newLine() diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 845b966..03ff481 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -88,7 +88,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { new StringEncoder()) val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray messages += conf.brokerId -> ms - producer.send(ms.map(m => KeyedMessage[String, Array[Byte]](topic, topic, m)):_*) + producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*) producer.close() count += ms.size } diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 65a67e8..462d9e2 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -158,11 +158,12 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testPartitionAndCollateEvents() { val producerDataList = new ArrayBuffer[KeyedMessage[Int,Message]] - producerDataList.append(new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes))) - producerDataList.append(new KeyedMessage[Int,Message]("topic2", 1, new Message("msg2".getBytes))) - producerDataList.append(new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes))) - producerDataList.append(new KeyedMessage[Int,Message]("topic1", 3, new Message("msg4".getBytes))) - producerDataList.append(new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes))) + // use bogus key and partition key override for some messages + producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = 0, message = new Message("msg1".getBytes))) + producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = -99, partKey = 1, message = new Message("msg2".getBytes))) + producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = 2, message = new Message("msg3".getBytes))) + producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = -101, partKey = 3, message = new Message("msg4".getBytes))) + producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = 4, message = new Message("msg5".getBytes))) val props = new Properties() props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) @@ -179,8 +180,8 @@ class AsyncProducerTest extends JUnit3Suite { topicPartitionInfos.put("topic1", topic1Metadata) topicPartitionInfos.put("topic2", topic2Metadata) - val intPartitioner = new Partitioner[Int] { - def partition(key: Int, numPartitions: Int): Int = key % numPartitions + val intPartitioner = new Partitioner { + def partition(key: Any, numPartitions: Int): Int = key.asInstanceOf[Int] % numPartitions } val config = new ProducerConfig(props) @@ -195,9 +196,9 @@ class AsyncProducerTest extends JUnit3Suite { val topic1Broker1Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)), new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes))) - val topic1Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 3, new Message("msg4".getBytes))) + val topic1Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", -101, 3, new Message("msg4".getBytes))) val topic2Broker1Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes))) - val topic2Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", 1, new Message("msg2".getBytes))) + val topic2Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", -99, 1, new Message("msg2".getBytes))) val expectedResult = Some(Map( 0 -> Map( TopicAndPartition("topic1", 0) -> topic1Broker1Data, @@ -225,7 +226,7 @@ class AsyncProducerTest extends JUnit3Suite { val producerPool = new ProducerPool(config) val handler = new DefaultEventHandler[String,String](config, - partitioner = null.asInstanceOf[Partitioner[String]], + partitioner = null.asInstanceOf[Partitioner], encoder = new StringEncoder, keyEncoder = new StringEncoder, producerPool = producerPool, @@ -284,7 +285,7 @@ class AsyncProducerTest extends JUnit3Suite { val producerDataList = new ArrayBuffer[KeyedMessage[String,String]] producerDataList.append(new KeyedMessage[String,String]("topic1", "msg1")) val handler = new DefaultEventHandler[String,String](config, - partitioner = null.asInstanceOf[Partitioner[String]], + partitioner = null.asInstanceOf[Partitioner], encoder = new StringEncoder, keyEncoder = new StringEncoder, producerPool = producerPool, @@ -331,7 +332,7 @@ class AsyncProducerTest extends JUnit3Suite { val producerPool = new ProducerPool(config) val handler = new DefaultEventHandler[String,String](config, - partitioner = null.asInstanceOf[Partitioner[String]], + partitioner = null.asInstanceOf[Partitioner], encoder = null.asInstanceOf[Encoder[String]], keyEncoder = null.asInstanceOf[Encoder[String]], producerPool = producerPool, @@ -372,7 +373,7 @@ class AsyncProducerTest extends JUnit3Suite { val msgs = TestUtils.getMsgStrings(10) val handler = new DefaultEventHandler[String,String](config, - partitioner = null.asInstanceOf[Partitioner[String]], + partitioner = null.asInstanceOf[Partitioner], encoder = new StringEncoder, keyEncoder = new StringEncoder, producerPool = producerPool, @@ -503,6 +504,6 @@ class AsyncProducerTest extends JUnit3Suite { } } -class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner[String] { - def partition(data: String, numPartitions: Int): Int = -1 +class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner { + def partition(data: Any, numPartitions: Int): Int = -1 } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 40bfacb..1f7e686 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -517,18 +517,18 @@ class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] { override def toBytes(n: Int) = n.toString.getBytes } -class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner[String] { - def partition(data: String, numPartitions: Int): Int = { - (data.length % numPartitions) +class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner{ + def partition(data: Any, numPartitions: Int): Int = { + (data.asInstanceOf[String].length % numPartitions) } } -class HashPartitioner(props: VerifiableProperties = null) extends Partitioner[String] { - def partition(data: String, numPartitions: Int): Int = { +class HashPartitioner(props: VerifiableProperties = null) extends Partitioner { + def partition(data: Any, numPartitions: Int): Int = { (data.hashCode % numPartitions) } } -class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner[Int] { - def partition(data: Int, numPartitions: Int): Int = data +class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner { + def partition(data: Any, numPartitions: Int): Int = data.asInstanceOf[Int] }