Kafka
  1. Kafka
  2. KAFKA-925

Add optional partition key override in producer

    Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.1
    • Fix Version/s: 0.8.1
    • Component/s: producer
    • Labels:
      None

      Description

      We have a key that is used for partitioning in the producer and stored with the message. Actually these uses, though often the same, could be different. The two meanings are effectively:
      1. Assignment to a partition
      2. Deduplication within a partition

      In cases where we want to allow the client to take advantage of both of these and they aren't the same it would be nice to allow them to be specified separately.

      To implement this I added an optional partition key to KeyedMessage. When specified this key is used for partitioning rather than the message key. This key is of type Any and the parametric typing is removed from the partitioner to allow it to work with either key.

      An alternative would be to allow the partition id to specified in the KeyedMessage. This would be slightly more convenient in the case where there is no partition key but instead you know a priori the partition number-this case must be handled by giving the partition id as the partition key and using an identity partitioner which is slightly more roundabout. However this is inconsistent with the normal partitioning which requires a key in the case where the partition is determined by a key-in that case you would be manually calling your partitioner in user code. It seems best to me to either use a key or always a partition and since we currently take a key I stuck with that.

      1. KAFKA-925-v1.patch
        16 kB
        Jay Kreps
      2. KAFKA-925-v2.patch
        18 kB
        Jay Kreps
      3. KAFKA-925-post-commit-v1.patch
        3 kB
        Jay Kreps

        Activity

        Hide
        Jay Kreps added a comment -

        Updated patch--rebased to trunk.

        Show
        Jay Kreps added a comment - Updated patch--rebased to trunk.
        Hide
        Chris Riccomini added a comment -

        Hey Jay,

        Seems pretty reasonable to me. Is the reason for the type change in the Partitioner so that you can handle either keys of type K (key) or keys of any type (part key) using the same partitioner?

        Cheers,
        Chris

        Show
        Chris Riccomini added a comment - Hey Jay, Seems pretty reasonable to me. Is the reason for the type change in the Partitioner so that you can handle either keys of type K (key) or keys of any type (part key) using the same partitioner? Cheers, Chris
        Hide
        Guozhang Wang added a comment -

        Hi Jay,

        In the DefaultEventHandler, only the key is serialized and sent. The partition key is used to determine the partition and then dropped. So the consumers would not be able to read this partition key. Will this be a problem for, for example MirrorMaker?

        Guozhang

        Show
        Guozhang Wang added a comment - Hi Jay, In the DefaultEventHandler, only the key is serialized and sent. The partition key is used to determine the partition and then dropped. So the consumers would not be able to read this partition key. Will this be a problem for, for example MirrorMaker? Guozhang
        Hide
        Jay Kreps added a comment -

        Yes the idea of this feature is to make it possible to partition by something other than the stored key.

        Show
        Jay Kreps added a comment - Yes the idea of this feature is to make it possible to partition by something other than the stored key.
        Hide
        Jay Kreps added a comment -

        It is definitely true that downstream consumers cannot use the same key, though a generic tool can always just retain the partition by setting the partition number as the partition key and using a partitioner which just uses that number.

        Show
        Jay Kreps added a comment - It is definitely true that downstream consumers cannot use the same key, though a generic tool can always just retain the partition by setting the partition number as the partition key and using a partitioner which just uses that number.
        Hide
        Joel Koshy added a comment -

        +1 , looks good to me.

        DefaultPartitioner: Do we need the type parameter anymore?

        Guozhang has a good point about tools such as mirror maker not having access to the original partitioning key.
        However, I can see that it would be clunky as we would then need a partition key serializer as well. Also,
        for something like offset-preserving mirrors we would anyway have the source cluster's partition available,
        so I don't see it as a major issue.

        ConsoleProducer: the enqueue timeout change seems reasonable - I'm assuming it was done to avoid dropping
        messages when piping into ConsoleProducer. Correct?

        Show
        Joel Koshy added a comment - +1 , looks good to me. DefaultPartitioner: Do we need the type parameter anymore? Guozhang has a good point about tools such as mirror maker not having access to the original partitioning key. However, I can see that it would be clunky as we would then need a partition key serializer as well. Also, for something like offset-preserving mirrors we would anyway have the source cluster's partition available, so I don't see it as a major issue. ConsoleProducer: the enqueue timeout change seems reasonable - I'm assuming it was done to avoid dropping messages when piping into ConsoleProducer. Correct?
        Hide
        Jay Kreps added a comment -

        Committed.

        Show
        Jay Kreps added a comment - Committed.
        Hide
        Jay Kreps added a comment -

        Good point, type parameter is not needed in DefaultPartitioner.

        Yes, mirror maker should always partition with the partition number rather than trying to reverse engineer the client partitioning logic. This additional key is specifically for the case where you want to partition by something OTHER than the stored key.

        For console producer, yes, we should default to not losing data. Not really related to this issue, but a good change.

        Show
        Jay Kreps added a comment - Good point, type parameter is not needed in DefaultPartitioner. Yes, mirror maker should always partition with the partition number rather than trying to reverse engineer the client partitioning logic. This additional key is specifically for the case where you want to partition by something OTHER than the stored key. For console producer, yes, we should default to not losing data. Not really related to this issue, but a good change.
        Hide
        Jun Rao added a comment -

        In ConsoleProducer, shouldn't we default "queue-enqueuetimeout-ms" to -1, instead of Int.MaxValue, if we want to make the producer block when queue is full?

        Show
        Jun Rao added a comment - In ConsoleProducer, shouldn't we default "queue-enqueuetimeout-ms" to -1, instead of Int.MaxValue, if we want to make the producer block when queue is full?
        Hide
        Jay Kreps added a comment -

        Yeah fair point. I didn't realize we had a special infinity value so I used Int.MaxValue to be "a long time". Attached is a patch that fixes that and cleans up some of the cli docs.

        Show
        Jay Kreps added a comment - Yeah fair point. I didn't realize we had a special infinity value so I used Int.MaxValue to be "a long time". Attached is a patch that fixes that and cleans up some of the cli docs.
        Hide
        Jun Rao added a comment -

        Thanks for the post commit patch. Looks good. +1.

        Show
        Jun Rao added a comment - Thanks for the post commit patch. Looks good. +1.

          People

          • Assignee:
            Jay Kreps
            Reporter:
            Jay Kreps
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development