Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20999

Confluent Avro Format should document how to serialize kafka keys

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

      Description

      The Confluent Avro Format only shows example of how to serialize/deserialize Kafka values. Also, parameter description is not always clear what is influencing the source and the sink behaviour, IMHO.

      This seems surprising especially in the context of a sink kafka connector since keys are such an important concept in that case.

      Adding examples of how to serialize/deserialize Kafka keys would add clarity.

      While it can be argued that a connector format is independent from the underlying storage, probably showing kafka-oriented examples in this case (i.e, with a concept of "key" and "value") makes senses here since this connector is very much thought with Kafka in mind.

       

      I'm happy to submit a PR with all if this suggested change is approved?

       

      I suggest to add this:

      writing to Kafka while keeping the keys in "raw" big endian format:

      CREATE TABLE OUTPUT_TABLE (
        user_id BIGINT, 
        item_id BIGINT, 
        category_id BIGINT, 
        behavior STRING
       ) WITH (
       'connector' = 'kafka',
       'topic' = 'user_behavior',
       'properties.bootstrap.servers' = 'localhost:9092',
      
       'key.format' = 'raw',
       'key.raw.endianness' = 'big-endian',
       'key.fields' = 'user_id',
      
       'value.format' = 'avro-confluent',
       'value.avro-confluent.schema-registry.url' = 'http://localhost:8081',
       'value.avro-confluent.schema-registry.subject' = 'user_behavior'
       )
      
       
      
      

       

      writing to Kafka while registering both the key and the value to the schema registry

      CREATE TABLE OUTPUT_TABLE (
        user_id BIGINT, 
        item_id BIGINT, 
        category_id BIGINT, 
        behavior STRING
       ) WITH (
       'connector' = 'kafka',
       'topic' = 'user_behavior',
       'properties.bootstrap.servers' = 'localhost:9092',
      
      
       -- => this will register a {user_id: long} Avro type in the schema registry.
       -- Watch out: schema evolution in the context of a Kafka key is almost never backward nor
       -- forward compatible in practice due to hash partitioning.
       'key.avro-confluent.schema-registry.url' = 'http://localhost:8081',
       'key.avro-confluent.schema-registry.subject' = 'user_behavior_key',
       'key.format' = 'avro-confluent',
       'key.fields' = 'user_id',
      
       'value.format' = 'avro-confluent',
       'value.avro-confluent.schema-registry.url' = 'http://localhost:8081',
       'value.avro-confluent.schema-registry.subject' = 'user_behavior_value'
       )
       
      

       

      reading form Kafka with both the key and value schema in the registry while resolving field name clashes:

      CREATE TABLE INPUT_TABLE (
        -- user_id as read from the kafka key:
        from_kafka_key_user_id BIGINT,
       
        -- user_id, and other fields, as read from the kafka value-
        user_id BIGINT, 
        item_id BIGINT, 
        category_id BIGINT, 
        behavior STRING
       ) WITH (
       'connector' = 'kafka',
       'topic' = 'user_behavior',
       'properties.bootstrap.servers' = 'localhost:9092',
      
      
       'key.format' = 'avro-confluent',
       'key.avro-confluent.schema-registry.url' = 'http://localhost:8081',
       'key.fields' = 'from_kafka_key_user_id',
      
       -- Adds a column prefix when mapping the avro fields of the kafka key to columns of this Table
       -- to avoid clashes with avro fields of the value (both contain 'user_id' in this example)
       'key.fields-prefix' = 'from_kafka_key_',
      
       'value.format' = 'avro-confluent',
       -- cannot include key here since dealt with above
       'value.fields-include' = 'EXCEPT_KEY',
       'value.avro-confluent.schema-registry.url' = 'http://localhost:8081'
       )
       
      

       

       

       

        Attachments

        Issue Links

          Activity

            People

            • Assignee:
              svend Svend Vanderveken
              Reporter:
              svend Svend Vanderveken

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment