Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-839

KafkaSystemProducer should use the same partitioning hash function as Kafka's producer

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 0.9.1
    • Fix Version/s: None
    • Component/s: kafka
    • Labels:
      None

      Description

      Samza's KafkaSystemProducer class generates the partition key using:

      abs(envelope.getPartitionKey.hashCode()) % numPartitions

      However, Kafka's producer generates the partition key this way:

      Utils.abs(Utils.murmur2(record.key())) % numPartitions

      This makes it difficult for me to join 2 data sources on a common key when one source is produced by Samza and the other by a default Kafka producer.

      As a work-around, I have to modify the upstream job (that uses the default kafka producer) to write with an explicit partition key using Samza's hashing logic.

        Issue Links

          Activity

          Hide
          navina Navina Ramesh added a comment -

          Kishore Nallan Just curious.. Which version of Kafka producer are you using? The partition key that is generated in KafkaSystemProducer uses the Default Partitioner logic that kafka provides.
          Iirc, it was introduced when we upgraded to use kafka 0.8 producer that takes in an integer partition key in the ProducerRecord interface.
          It would be great if you can point out where the Utils.murmur2 is used in Kafka. Thanks!

          Show
          navina Navina Ramesh added a comment - Kishore Nallan Just curious.. Which version of Kafka producer are you using? The partition key that is generated in KafkaSystemProducer uses the Default Partitioner logic that kafka provides. Iirc, it was introduced when we upgraded to use kafka 0.8 producer that takes in an integer partition key in the ProducerRecord interface. It would be great if you can point out where the Utils.murmur2 is used in Kafka. Thanks!
          Hide
          kishorenc Kishore Nallan added a comment -
          Show
          kishorenc Kishore Nallan added a comment - Navina Ramesh You can see it here: https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69 I use the callback-based producer, which calls the above partitioning function if a partition id is already not set in the `ProducerRecord` here: https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L333
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Kishore Nallan, thanks for digging it out. That's an interesting discovery. Jiangjie Qin, I remember that you mentioned that Kafka producer's default partitioner has some backward incompatible change some time in the past. Was it between 8.0 to 8.2?

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Kishore Nallan , thanks for digging it out. That's an interesting discovery. Jiangjie Qin , I remember that you mentioned that Kafka producer's default partitioner has some backward incompatible change some time in the past. Was it between 8.0 to 8.2?
          Hide
          becket_qin Jiangjie Qin added a comment - - edited

          Yi Pan Yes, there is backward incompatible change in partitioner between the old producer and new producer. This is caused byte abs() function mentioned in this ticket.

          Show
          becket_qin Jiangjie Qin added a comment - - edited Yi Pan Yes, there is backward incompatible change in partitioner between the old producer and new producer. This is caused byte abs() function mentioned in this ticket.
          Hide
          nickpan47 Yi Pan (Data Infrastructure) added a comment -

          Thanks, Jiangjie Qin. Based on that, I think the best way would be to instantiate the partition.class defined by the producer config in Samza and call the partition function from the partition.class object. The reason being:

          1. KafkaProducer client will only apply partition.class to the ProducerRecord's key. Hence, it is not possible to separate the partition key from the record key, as Samza support today
          2. It would be better to sync w/ KafkaProducer's partition.class's implementation to avoid the issue reported here.

          Thoughts?

          Show
          nickpan47 Yi Pan (Data Infrastructure) added a comment - Thanks, Jiangjie Qin . Based on that, I think the best way would be to instantiate the partition.class defined by the producer config in Samza and call the partition function from the partition.class object. The reason being: KafkaProducer client will only apply partition.class to the ProducerRecord's key. Hence, it is not possible to separate the partition key from the record key, as Samza support today It would be better to sync w/ KafkaProducer's partition.class's implementation to avoid the issue reported here. Thoughts?
          Hide
          michal.harish Michal Harish added a comment -

          I just came across a related issue. I have some kafka consumers which expect very specific partitioning and I am replacing the upstream batch job with samza processor. After noticing the suspicious warning that partitioner.class is not a known config I looked into the code and realized that Samza actually doesn't respect partitioner.class config for kafka system producer. Calling partitioner.class given in samza system config would hence be good for another couple of reasons:

          • sometimes streaming jobs actually need custom partitioning - not just kafka (or samza) default modulo partitioner
          • it is a bit obscure that partitioner.class is ignored despite samza docs say any kafka producer config can be passed on

          Also it should be possible to invoke kafka's partitioner for both partitioner and record keys as it takes Any as argument but I don't know exactly what role does the separate partition key play in samza design.

          Show
          michal.harish Michal Harish added a comment - I just came across a related issue. I have some kafka consumers which expect very specific partitioning and I am replacing the upstream batch job with samza processor. After noticing the suspicious warning that partitioner.class is not a known config I looked into the code and realized that Samza actually doesn't respect partitioner.class config for kafka system producer. Calling partitioner.class given in samza system config would hence be good for another couple of reasons: sometimes streaming jobs actually need custom partitioning - not just kafka (or samza) default modulo partitioner it is a bit obscure that partitioner.class is ignored despite samza docs say any kafka producer config can be passed on Also it should be possible to invoke kafka's partitioner for both partitioner and record keys as it takes Any as argument but I don't know exactly what role does the separate partition key play in samza design.

            People

            • Assignee:
              Unassigned
              Reporter:
              kishorenc Kishore Nallan
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:

                Development