From 8480b8646d9a82d00f5ec7101b8e9639d4834358 Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Wed, 17 Jul 2013 14:13:05 -0700 Subject: [PATCH] Add an optional partition key to control which partition the message is sent to irrespective of the stored key. --- .../scala/kafka/producer/ConsoleProducer.scala | 2 +- .../scala/kafka/producer/DefaultPartitioner.scala | 4 +-- .../main/scala/kafka/producer/KeyedMessage.scala | 18 ++++++++++-- .../main/scala/kafka/producer/Partitioner.scala | 4 +-- core/src/main/scala/kafka/producer/Producer.scala | 2 +- .../kafka/producer/async/DefaultEventHandler.scala | 11 +++---- .../test/scala/other/kafka/TestLogCleaning.scala | 4 +-- .../scala/unit/kafka/integration/FetcherTest.scala | 2 +- .../unit/kafka/producer/AsyncProducerTest.scala | 31 ++++++++++---------- .../test/scala/unit/kafka/utils/TestUtils.scala | 14 ++++----- 10 files changed, 53 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 5539bce..59222a2 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -60,7 +60,7 @@ object ConsoleProducer { .withRequiredArg .describedAs("queue enqueuetimeout ms") .ofType(classOf[java.lang.Long]) - .defaultsTo(0) + .defaultsTo(Int.MaxValue) val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests") .withRequiredArg .describedAs("request required acks") diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala index 9bffeb6..df1a66f 100644 --- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala +++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala @@ -20,10 +20,10 @@ package kafka.producer import kafka.utils._ -private class DefaultPartitioner[T](props: VerifiableProperties = null) extends Partitioner[T] { +private class DefaultPartitioner[T](props: VerifiableProperties = null) extends Partitioner { private val random = new java.util.Random - def partition(key: T, numPartitions: Int): Int = { + def partition(key: Any, numPartitions: Int): Int = { Utils.abs(key.hashCode) % numPartitions } } diff --git a/core/src/main/scala/kafka/producer/KeyedMessage.scala b/core/src/main/scala/kafka/producer/KeyedMessage.scala index b13c4ec..388bc9b 100644 --- a/core/src/main/scala/kafka/producer/KeyedMessage.scala +++ b/core/src/main/scala/kafka/producer/KeyedMessage.scala @@ -18,13 +18,25 @@ package kafka.producer /** - * A topic, key, and value + * A topic, key, and value. + * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored. */ -case class KeyedMessage[K, V](val topic: String, val key: K, val message: V) { +case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) { if(topic == null) throw new IllegalArgumentException("Topic cannot be null.") - def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], message) + def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message) + + def this(topic: String, key: K, message: V) = this(topic, key, key, message) + + def partitionKey = { + if(partKey != null) + partKey + else if(hasKey) + key + else + null + } def hasKey = key != null } \ No newline at end of file diff --git a/core/src/main/scala/kafka/producer/Partitioner.scala b/core/src/main/scala/kafka/producer/Partitioner.scala index 9ee61c7..efe6d6d 100644 --- a/core/src/main/scala/kafka/producer/Partitioner.scala +++ b/core/src/main/scala/kafka/producer/Partitioner.scala @@ -23,11 +23,11 @@ package kafka.producer * Implementations will be constructed via reflection and are required to have a constructor that takes a single * VerifiableProperties instance--this allows passing configuration properties into the partitioner implementation. */ -trait Partitioner[T] { +trait Partitioner { /** * Uses the key to calculate a partition bucket id for routing * the data to the appropriate broker partition * @return an integer between 0 and numPartitions-1 */ - def partition(key: T, numPartitions: Int): Int + def partition(key: Any, numPartitions: Int): Int } diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index bb16a29..ba94e87 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -56,7 +56,7 @@ class Producer[K,V](val config: ProducerConfig, def this(config: ProducerConfig) = this(config, new DefaultEventHandler[K,V](config, - Utils.createObject[Partitioner[K]](config.partitionerClass, config.props), + Utils.createObject[Partitioner](config.partitionerClass, config.props), Utils.createObject[Encoder[V]](config.serializerClass, config.props), Utils.createObject[Encoder[K]](config.keySerializerClass, config.props), new ProducerPool(config))) diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 1ecaeaa..48ddb6a 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -28,7 +28,7 @@ import java.util.concurrent.atomic._ import kafka.api.{TopicMetadata, ProducerRequest} class DefaultEventHandler[K,V](config: ProducerConfig, - private val partitioner: Partitioner[K], + private val partitioner: Partitioner, private val encoder: Encoder[V], private val keyEncoder: Encoder[K], private val producerPool: ProducerPool, @@ -126,9 +126,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig, events.map{e => try { if(e.hasKey) - serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = e.key, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message))) + serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message))) else - serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message))) + serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(bytes = encoder.toBytes(e.message))) } catch { case t => producerStats.serializationErrorRate.mark() @@ -149,7 +149,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, try { for (message <- messages) { val topicPartitionsList = getPartitionListForTopic(message) - val partitionIndex = getPartition(message.topic, message.key, topicPartitionsList) + val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList) val brokerPartition = topicPartitionsList(partitionIndex) // postpone the failure until the send operation, so that requests for other brokers are handled correctly @@ -196,11 +196,12 @@ class DefaultEventHandler[K,V](config: ProducerConfig, /** * Retrieves the partition id and throws an UnknownTopicOrPartitionException if * the value of partition is not between 0 and numPartitions-1 + * @param topic The topic * @param key the partition key * @param topicPartitionList the list of available partitions * @return the partition id */ - private def getPartition(topic: String, key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = { + private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = { val numPartitions = topicPartitionList.size if(numPartitions <= 0) throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist") diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala index 0bef218..22b16e5 100644 --- a/core/src/test/scala/other/kafka/TestLogCleaning.scala +++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala @@ -260,9 +260,9 @@ object TestLogCleaning { val delete = i % 100 < percentDeletes val msg = if(delete) - KeyedMessage[String, String](topic = topic, key = key.toString, message = null) + new KeyedMessage[String, String](topic = topic, key = key.toString, message = null) else - KeyedMessage[String, String](topic = topic, key = key.toString, message = i.toString) + new KeyedMessage[String, String](topic = topic, key = key.toString, message = i.toString) producer.send(msg) producedWriter.write(TestRecord(topic, key, i, delete).toString) producedWriter.newLine() diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 67ed201..b1d56d6 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -88,7 +88,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { new StringEncoder()) val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray messages += conf.brokerId -> ms - producer.send(ms.map(m => KeyedMessage[String, Array[Byte]](topic, topic, m)):_*) + producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*) producer.close() count += ms.size } diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index f2f91e8..74a2743 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -158,11 +158,12 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testPartitionAndCollateEvents() { val producerDataList = new ArrayBuffer[KeyedMessage[Int,Message]] - producerDataList.append(new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes))) - producerDataList.append(new KeyedMessage[Int,Message]("topic2", 1, new Message("msg2".getBytes))) - producerDataList.append(new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes))) - producerDataList.append(new KeyedMessage[Int,Message]("topic1", 3, new Message("msg4".getBytes))) - producerDataList.append(new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes))) + // use bogus key and partition key override for some messages + producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = 0, message = new Message("msg1".getBytes))) + producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = -99, partKey = 1, message = new Message("msg2".getBytes))) + producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = 2, message = new Message("msg3".getBytes))) + producerDataList.append(new KeyedMessage[Int,Message]("topic1", key = -101, partKey = 3, message = new Message("msg4".getBytes))) + producerDataList.append(new KeyedMessage[Int,Message]("topic2", key = 4, message = new Message("msg5".getBytes))) val props = new Properties() props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) @@ -179,8 +180,8 @@ class AsyncProducerTest extends JUnit3Suite { topicPartitionInfos.put("topic1", topic1Metadata) topicPartitionInfos.put("topic2", topic2Metadata) - val intPartitioner = new Partitioner[Int] { - def partition(key: Int, numPartitions: Int): Int = key % numPartitions + val intPartitioner = new Partitioner { + def partition(key: Any, numPartitions: Int): Int = key.asInstanceOf[Int] % numPartitions } val config = new ProducerConfig(props) @@ -195,9 +196,9 @@ class AsyncProducerTest extends JUnit3Suite { val topic1Broker1Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new Message("msg1".getBytes)), new KeyedMessage[Int,Message]("topic1", 2, new Message("msg3".getBytes))) - val topic1Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 3, new Message("msg4".getBytes))) + val topic1Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", -101, 3, new Message("msg4".getBytes))) val topic2Broker1Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", 4, new Message("msg5".getBytes))) - val topic2Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", 1, new Message("msg2".getBytes))) + val topic2Broker2Data = ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic2", -99, 1, new Message("msg2".getBytes))) val expectedResult = Some(Map( 0 -> Map( TopicAndPartition("topic1", 0) -> topic1Broker1Data, @@ -225,7 +226,7 @@ class AsyncProducerTest extends JUnit3Suite { val producerPool = new ProducerPool(config) val handler = new DefaultEventHandler[String,String](config, - partitioner = null.asInstanceOf[Partitioner[String]], + partitioner = null.asInstanceOf[Partitioner], encoder = new StringEncoder, keyEncoder = new StringEncoder, producerPool = producerPool, @@ -285,7 +286,7 @@ class AsyncProducerTest extends JUnit3Suite { val producerDataList = new ArrayBuffer[KeyedMessage[String,String]] producerDataList.append(new KeyedMessage[String,String]("topic1", "msg1")) val handler = new DefaultEventHandler[String,String](config, - partitioner = null.asInstanceOf[Partitioner[String]], + partitioner = null.asInstanceOf[Partitioner], encoder = new StringEncoder, keyEncoder = new StringEncoder, producerPool = producerPool, @@ -332,7 +333,7 @@ class AsyncProducerTest extends JUnit3Suite { val producerPool = new ProducerPool(config) val handler = new DefaultEventHandler[String,String](config, - partitioner = null.asInstanceOf[Partitioner[String]], + partitioner = null.asInstanceOf[Partitioner], encoder = null.asInstanceOf[Encoder[String]], keyEncoder = null.asInstanceOf[Encoder[String]], producerPool = producerPool, @@ -373,7 +374,7 @@ class AsyncProducerTest extends JUnit3Suite { val msgs = TestUtils.getMsgStrings(10) val handler = new DefaultEventHandler[String,String](config, - partitioner = null.asInstanceOf[Partitioner[String]], + partitioner = null.asInstanceOf[Partitioner], encoder = new StringEncoder, keyEncoder = new StringEncoder, producerPool = producerPool, @@ -506,6 +507,6 @@ class AsyncProducerTest extends JUnit3Suite { } } -class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner[String] { - def partition(data: String, numPartitions: Int): Int = -1 +class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner { + def partition(data: Any, numPartitions: Int): Int = -1 } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 6b343e3..148bb4b 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -525,18 +525,18 @@ class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] { override def toBytes(n: Int) = n.toString.getBytes } -class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner[String] { - def partition(data: String, numPartitions: Int): Int = { - (data.length % numPartitions) +class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner{ + def partition(data: Any, numPartitions: Int): Int = { + (data.asInstanceOf[String].length % numPartitions) } } -class HashPartitioner(props: VerifiableProperties = null) extends Partitioner[String] { - def partition(data: String, numPartitions: Int): Int = { +class HashPartitioner(props: VerifiableProperties = null) extends Partitioner { + def partition(data: Any, numPartitions: Int): Int = { (data.hashCode % numPartitions) } } -class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner[Int] { - def partition(data: Int, numPartitions: Int): Int = data +class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner { + def partition(data: Any, numPartitions: Int): Int = data.asInstanceOf[Int] } -- 1.7.10.2 (Apple Git-33)