Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22541

add json format filter params

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      In my case,one kafka topic store multiple table data,for example:
      {"id":"121","source":"users","content":{"name":"test01","age":20,"addr":"addr1"}}
      {"id":"122","source":"users","content":{"name":"test02","age":23,"addr":"addr2"}}
      {"id":"124","source":"users","content":{"name":"test03","age":34,"addr":"addr3"}}
      {"id":"124","source":"order","content":{"orderId":"100001","price":34,"addr":"addr1231"}}
      {"id":"125","source":"order","content":{"orderId":"100002","price":34,"addr":"addr1232"}}
      {"id":"126","source":"order","content":{"orderId":"100003","price":34,"addr":"addr1233"}}
       
      I  just want to consume data from  talbe order,flink sql ddl like this:
      CREATE TABLE order (
      orderId STRING,
      age INT,
      addr STRING
      )
      with (
      'connector'='kafka',
      'topic'='kafkatopic',
      'properties.bootstrap.servers'='localhost:9092',
      'properties.group.id'='testGroup',
      'scan.startup.mode'='earliest-offset',
      'format'='json',
      'path-fliter'='$[?(@.source=="order")]',
      'path-data'='$.content'
      );
       
      path-fliter and path-data can use  JsonPath (https://github.com/json-path/JsonPath)
       

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            sandyfog sandy du

            Dates

              Created:
              Updated:

              Slack

                Issue deployment