Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19790

Writing MAP<STRING, STRING> to Kafka with JSON format produces incorrect data.

    XMLWordPrintableJSON

    Details

      Description

      Running the following SQL script writes incorrect data to Kafka:

      CREATE TEMPORARY TABLE tmp_1 (m MAP<String, String>) WITH (
        'connector' = 'kafka',
        'format' = 'json',
        'properties.bootstrap.servers' = '...',
        'properties.group.id' = '...',
        'topic' = 'tmp-1'
      );
      
      CREATE TEMPORARY TABLE gen (k STRING, v STRING) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY VIEW gen_short AS
      SELECT SUBSTR(k, 0, 4) AS k, SUBSTR(v, 0, 4) AS v FROM gen;
      
      INSERT INTO tmp_1
      SELECT MAP[k, v] FROM gen_short; 

      Printing the content of the tmp-1 topics results in the following output:

      $ kafka-console-consumer --bootstrap-server ... --from-beginning --topic tmp-1 | head -n 5
      {"m":{"8a93":"6102"}}
      {"m":{"8a93":"6102","7922":"f737"}}
      {"m":{"8a93":"6102","7922":"f737","9b63":"15b0"}}
      {"m":{"8a93":"6102","7922":"f737","9b63":"15b0","c38b":"b55c"}}
      {"m":{"8a93":"6102","7922":"f737","9b63":"15b0","c38b":"b55c","222c":"f3e2"}}
      

      As you can see, the map is not correctly encoded as JSON and written to Kafka.

      I've run the query with the Blink planner with object reuse and operator pipelining disabled.
      Writing with Avro works as expected.

      Hence I assume that the JSON encoder/serializer reuses the Map object when encoding the JSON.

       

       
       

        Attachments

          Activity

            People

            • Assignee:
              libenchao Benchao Li
              Reporter:
              fhueske Fabian Hueske
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: