Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16639

Flink SQL Kafka source connector, add the no json format filter params when format.type is json

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Minor
    • Resolution: Invalid
    • None
    • None
    • Connectors / Kafka
    • None

    Description

      In my thought, kafka source connector is the one of most frequently used connector in flink sql. Flink sql kafka source connector supports the json,csv or other data format. But there is a problem for json format in kafka source connector. For example, flink sql kafka source ddl l
      like this:
      CREATE TABLE team_olap_table (
      a varchar,
      b varchar,
      )
      with (
      'connector.type' = 'kafka',
      'connector.version' = '0.10',
      'connector.topic' = topics',
      'connector.properties.0.key' = 'group.id',
      'connector.properties.0.value' = 'hello_world',
      'connector.properties.1.key' = 'bootstrap.servers',
      'connector.properties.1.value' = 'xxx',
      'connector.property-version' = '1',
      'connector.startup-mode' = 'latest-offset',
      'format.type' = 'json',
      'format.property-version' = '1',
      'format.derive-schema' = 'true',
      'update-mode' = 'append'
      );

      If the kafka topic messages are not json format ,just one or two records,the flink sql task will fail-over all the time .
      In order to solve this problem , if flink sql source connector use the json-format, I want to add the 'format.fail-on-not-json-record' param in flink-json module, if this param is true(default),when read the no-json records, the flink will fail, if this param is false, the flink sql task will filter no-json records,the flink task running normally.

      Attachments

        Activity

          People

            Unassigned Unassigned
            shenlang LakeShen
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: