Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22541

add json format filter params

    XMLWordPrintableJSON

    Details

      Description

      In my case,one kafka topic store multiple table data,for example:
      {"id":"121","source":"users","content":{"name":"test01","age":20,"addr":"addr1"}}
      {"id":"122","source":"users","content":{"name":"test02","age":23,"addr":"addr2"}}
      {"id":"124","source":"users","content":{"name":"test03","age":34,"addr":"addr3"}}
      {"id":"124","source":"order","content":{"orderId":"100001","price":34,"addr":"addr1231"}}
      {"id":"125","source":"order","content":{"orderId":"100002","price":34,"addr":"addr1232"}}
      {"id":"126","source":"order","content":{"orderId":"100003","price":34,"addr":"addr1233"}}
       
      I  just want to consume data from  talbe order,flink sql ddl like this:
      CREATE TABLE order (
      orderId STRING,
      age INT,
      addr STRING
      )
      with (
      'connector'='kafka',
      'topic'='kafkatopic',
      'properties.bootstrap.servers'='localhost:9092',
      'properties.group.id'='testGroup',
      'scan.startup.mode'='earliest-offset',
      'format'='json',
      'path-fliter'='$[?(@.source=="order")]',
      'path-data'='$.content'
      );
       
      path-fliter and path-data can use  JsonPath (https://github.com/json-path/JsonPath)
       

        Attachments

        1. image-2021-05-06-13-38-37-284.png
          331 kB
          sandy du
        2. image-2021-05-06-13-37-49-160.png
          120 kB
          sandy du

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              sandyfog sandy du
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated: