Details
-
Improvement
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
1.11.0, 1.12.0
-
None
-
None
Description
In my case,one kafka topic store multiple table data,for example:
{"id":"121","source":"users","content":{"name":"test01","age":20,"addr":"addr1"}}
{"id":"122","source":"users","content":{"name":"test02","age":23,"addr":"addr2"}}
{"id":"124","source":"users","content":{"name":"test03","age":34,"addr":"addr3"}}
{"id":"124","source":"order","content":{"orderId":"100001","price":34,"addr":"addr1231"}}
{"id":"125","source":"order","content":{"orderId":"100002","price":34,"addr":"addr1232"}}
{"id":"126","source":"order","content":{"orderId":"100003","price":34,"addr":"addr1233"}}
I just want to consume data from talbe order,flink sql ddl like this:
CREATE TABLE order (
orderId STRING,
age INT,
addr STRING
)
with (
'connector'='kafka',
'topic'='kafkatopic',
'properties.bootstrap.servers'='localhost:9092',
'properties.group.id'='testGroup',
'scan.startup.mode'='earliest-offset',
'format'='json',
'path-fliter'='$[?(@.source=="order")]',
'path-data'='$.content'
);
path-fliter and path-data can use JsonPath (https://github.com/json-path/JsonPath)