Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.16.1
-
None
Description
-- flink version:1.16.1 -- parallelism.default: 1 CREATE TABLE s1( id string, gk bigint, price int )WITH( 'connector' = 'kafka' ,'properties.bootstrap.servers' = 'xx:9092' ,'properties.group.id' = 'xx-xx' ,'scan.startup.mode' = 'earliest-offset' ,'value.format' = 'json' ,'topic' = 'topic1' ); CREATE TABLE s2( id string, name string )WITH( 'connector' = 'kafka' ,'properties.bootstrap.servers' = 'xx:9092' ,'properties.group.id' = 'xx-xx' ,'scan.startup.mode' = 'earliest-offset' ,'value.format' = 'json' ,'topic' = 'topic2' ); create table sink( id string, name string, gk bigint, price int )with( 'connector'='print' ); create view v1 as select id, gk, last_value(price) price from s1 group by id,gk; insert into sink select v1.id, s2.name, v1.gk, v1.price from v1 left join s2 on v1.id=s2.id; -- 1.Enter two pieces of data into the topic1 topic: -- {"id":"1","gk":758,"price":100} -- {"id":"1","gk":1818,"price":200} -- The output is as follows: -- +I[1, null, 758, 100] -- +I[1, null, 1818, 200] -- 2.Enter two pieces of data into the topic2 topic: -- {"id":1,"name":"z3"} -- The output is as follows: -- -D[1, null, 1818, 200] -- -D[1, null, 758, 100] -- +I[1, z3, 1818, 200] -- +I[1, z3, 758, 100] -- My doubt is that the output should be in the order of input , like below: -- -D[1, null, 758, 100] -- -D[1, null, 1818, 200] -- +I[1, z3, 758, 100] -- +I[1, z3, 1818, 200] -- 3.When I re-run the above sql, the results are output in the order of input -- +I[1, z3, 758, 100] -- +I[1, z3, 1818, 200] -- Is there a way to control this uncertainty?
Attachments
Issue Links
- is related to
-
FLINK-31729 Unexpected UPDATE_BEFORE output record in LEFT OUTER JOIN
- Open