Kafka
  1. Kafka
  2. KAFKA-957

MirrorMaker needs to preserve the key in the source cluster

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:
      None

      Description

      Currently, MirrorMaker only propagates the message to the target cluster, but not the associated key.

      1. KAFKA-957.v4.patch
        5 kB
        Guozhang Wang
      2. KAFKA-957.v3.patch
        5 kB
        Guozhang Wang
      3. KAFKA-957.v2.patch
        4 kB
        Guozhang Wang
      4. KAFKA-957.v2.patch
        5 kB
        Guozhang Wang
      5. KAFKA-957.v1.patch
        5 kB
        Guozhang Wang

        Activity

        Jun Rao created issue -
        Guozhang Wang made changes -
        Field Original Value New Value
        Assignee Guozhang Wang [ guozhang ]
        Hide
        Guozhang Wang added a comment -

        The proposed design is the following:

        1. Keep the ProducerDataChannel for non-keyed messages.

        2. For each message in the consumers' stream, check if its key is null:

        2.1. If yes, put it directly to the ProducerDataChannel.

        2.2. If not, select the producer based on the key's hash value, and call the send function of that producer directly (would possibly block).

        Patch will be available once the system tests are passed.

        Show
        Guozhang Wang added a comment - The proposed design is the following: 1. Keep the ProducerDataChannel for non-keyed messages. 2. For each message in the consumers' stream, check if its key is null: 2.1. If yes, put it directly to the ProducerDataChannel. 2.2. If not, select the producer based on the key's hash value, and call the send function of that producer directly (would possibly block). Patch will be available once the system tests are passed.
        Hide
        Guozhang Wang added a comment -

        4. Add a ByteArrayPartitioner since Array[Byte].hashCode will result in different values for objects even with the same content. Enforce MirrorMaker to use this ByteArray Partitioner

        Passed MirrorMaker testcase 5001

        Show
        Guozhang Wang added a comment - 4. Add a ByteArrayPartitioner since Array [Byte] .hashCode will result in different values for objects even with the same content. Enforce MirrorMaker to use this ByteArray Partitioner Passed MirrorMaker testcase 5001
        Guozhang Wang made changes -
        Attachment KAFKA-957.v1.patch [ 12591954 ]
        Guozhang Wang made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Guozhang Wang added a comment -

        5. Use java.util.Arrays.hashCode to determine which producer to send a keyed message

        6. Check if the partitioner class is specified instead of always trying to override the partitioner.

        Passed testcase 5007 referring to KAFKA-976.

        Show
        Guozhang Wang added a comment - 5. Use java.util.Arrays.hashCode to determine which producer to send a keyed message 6. Check if the partitioner class is specified instead of always trying to override the partitioner. Passed testcase 5007 referring to KAFKA-976 .
        Guozhang Wang made changes -
        Attachment KAFKA-957.v2.patch [ 12592686 ]
        Guozhang Wang made changes -
        Attachment KAFKA-957.v2.patch [ 12592690 ]
        Hide
        Joel Koshy added a comment -

        Thanks for incorporating 5 and 6. Couple additional comments:

        • For the two match statements you have it is probably sufficient and
          clearer to just use if (key == null) .... and if (props.contains(..))
        • I'm not so sure if the trace is required but it could be useful. Would
          prefer the following format: "Sending message with key <key>" - no need to
          show the payload. Also, may want to use java.util.Arrays.toString on the
          byte array.
        • Per Jay's offline comments, hashCode in general is a bit unsafe to "rely".
          For e.g., it could be a non-uniform distribution or the underlying
          function could change. That said, your usage is safe. Still, it should be
          straightforward to do a custom hash function that we can rely on for
          consistency.
        Show
        Joel Koshy added a comment - Thanks for incorporating 5 and 6. Couple additional comments: For the two match statements you have it is probably sufficient and clearer to just use if (key == null) .... and if (props.contains(..)) I'm not so sure if the trace is required but it could be useful. Would prefer the following format: "Sending message with key <key>" - no need to show the payload. Also, may want to use java.util.Arrays.toString on the byte array. Per Jay's offline comments, hashCode in general is a bit unsafe to "rely". For e.g., it could be a non-uniform distribution or the underlying function could change. That said, your usage is safe. Still, it should be straightforward to do a custom hash function that we can rely on for consistency.
        Hide
        Guozhang Wang added a comment -

        Thanks for the comment.

        For the first comment, since I needed to make a var for creating the config object if I use if () instead of match, I did not incorporate that. For the second match I have changed it to "if".

        Show
        Guozhang Wang added a comment - Thanks for the comment. For the first comment, since I needed to make a var for creating the config object if I use if () instead of match, I did not incorporate that. For the second match I have changed it to "if".
        Guozhang Wang made changes -
        Attachment KAFKA-957.v3.patch [ 12592705 ]
        Hide
        Guozhang Wang added a comment -

        7. Use Utils.abs to make sure the result of the hashCode function would be always non-negative.

        Show
        Guozhang Wang added a comment - 7. Use Utils.abs to make sure the result of the hashCode function would be always non-negative.
        Guozhang Wang made changes -
        Attachment KAFKA-957.v4.patch [ 12593981 ]
        Hide
        Joel Koshy added a comment -

        +1

        Show
        Joel Koshy added a comment - +1
        Hide
        Joel Koshy added a comment -

        Committed to 0.8

        Show
        Joel Koshy added a comment - Committed to 0.8
        Joel Koshy made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Joel Koshy made changes -
        Status Resolved [ 5 ] Closed [ 6 ]

          People

          • Assignee:
            Guozhang Wang
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development