Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.12.2, 1.14.3
-
Flink 1.12.2 and Flink 1.14.3 test results are sometimes wrong
Description
Flink SQL TopN is executed multiple times with different results, sometimes with correct results and sometimes with incorrect results.
Example:
@Test public void flinkSqlJoinRetract() { EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000)); RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo(); RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo(); SourceFunction<Row> waybillSourceFunction = buildWaybillStreamSource(waybillTableTypeInfo); SourceFunction<Row> itemSourceFunction = buildItemStreamSource(itemTableTypeInfo); String waybillTable = "waybill"; String itemTable = "item"; DataStreamSource<Row> waybillStream = streamEnv.addSource( waybillSourceFunction, waybillTable, waybillTableTypeInfo); DataStreamSource<Row> itemStream = streamEnv.addSource( itemSourceFunction, itemTable, itemTableTypeInfo); Expression[] waybillFields = ExpressionParser .parseExpressionList(String.join(",", waybillTableTypeInfo.getFieldNames()) + ",proctime.proctime").toArray(new Expression[0]); Expression[] itemFields = ExpressionParser .parseExpressionList( String.join(",", itemTableTypeInfo.getFieldNames()) + ",proctime.proctime") .toArray(new Expression[0]); tableEnv.createTemporaryView(waybillTable, waybillStream, waybillFields); tableEnv.createTemporaryView(itemTable, itemStream, itemFields); String sql = "select \n" + " city_id, \n" + " count(*) as cnt\n" + "from (\n" + " select id,city_id\n" + " from (\n" + " select \n" + " id,\n" + " city_id,\n" + " row_number() over(partition by id order by utime desc ) as rno \n" + " from (\n" + " select \n" + " waybill.id as id,\n" + " coalesce(item.city_id, waybill.city_id) as city_id,\n" + " waybill.utime as utime \n" + " from waybill left join item \n" + " on waybill.id = item.id \n" + " ) \n" + " )\n" + " where rno =1\n" + ")\n" + "group by city_id"; StatementSet statementSet = tableEnv.createStatementSet(); Table table = tableEnv.sqlQuery(sql); DataStream<Tuple2<Boolean, Row>> rowDataStream = tableEnv.toRetractStream(table, Row.class); rowDataStream.printToErr(); try { streamEnv.execute(); } catch (Exception e) { e.printStackTrace(); } } private static RowTypeInfo buildWaybillTableTypeInfo() { TypeInformation[] types = new TypeInformation[]{Types.INT(), Types.STRING(), Types.LONG(), Types.LONG()}; String[] fields = new String[]{"id", "city_id", "rider_id", "utime"}; return new RowTypeInfo(types, fields); } private static RowTypeInfo buildItemTableTypeInfo() { TypeInformation[] types = new TypeInformation[]{Types.INT(), Types.STRING(), Types.LONG()}; String[] fields = new String[]{"id", "city_id", "utime"}; return new RowTypeInfo(types, fields); } //id,rider_id,city_id,utime private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo rowTypeInfo) { return new SourceFunction<Row>() { private volatile boolean stopped = false; int count = 0; int[] ids = {111, 222, 333, 111}; String[] cityIds = {"A", "A", "B", "A"}; @Override public void run(SourceContext<Row> ctx) throws Exception { while (!stopped) { int id = ids[count % ids.length]; String cityId = cityIds[count % cityIds.length]; Row row = new Row(4); row.setField(0, id); row.setField(1, cityId); row.setField(2, (long) RandomUtils.nextInt(1000, 2000)); row.setField(3, System.currentTimeMillis()); printRow(rowTypeInfo, row); ctx.collect(row); if (++count > 3) { stopped = true; } } } @Override public void cancel() { stopped = true; } }; } //id,city_id,utime private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo rowTypeInfo) { return new SourceFunction<Row>() { private volatile boolean stopped = false; int count = 0; int[] ids = {111, 333}; String[] cityIds = {"C", "D"}; @Override public void run(SourceContext<Row> ctx) throws Exception { while (!stopped) { Thread.sleep(RandomUtils.nextInt(1000, 2000)); int id = ids[count % ids.length]; String cityId = cityIds[count % cityIds.length]; Row row = new Row(3); row.setField(0, id); row.setField(1, cityId); //row.setField(2, System.currentTimeMillis()); printRow(rowTypeInfo, row); ctx.collect(row); if (++count >= 2) { stopped = true; } } } @Override public void cancel() { stopped = true; } }; } public static void printRow(RowTypeInfo rowTypeInfo, Row row) { String prefix = ""; for (int i = 0; i < rowTypeInfo.getArity(); ++i) { prefix = i > 0 ? "," : ""; System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + row.getField(i)); } System.out.println(); }
------------------------------------------------------------
wrong result | right result |
---|---|
id:111,city_id:A,rider_id:1137,utime:1650979957702 id:222,city_id:A,rider_id:1976,utime:1650979957725 id:333,city_id:B,rider_id:1916,utime:1650979957725 id:111,city_id:A,rider_id:1345,utime:1650979957725 (true,A,1) (false,A,1) (true,A,2) (true,B,1) (false,A,2) (true,A,1) (false,A,1) (true,A,2) id:111,city_id:C,utime:null (false,A,2) (true,A,1) (true,C,1) (false,A,1) (false,C,1) (true,C,2) id:333,city_id: D,utime:null (false,B,1) (true,D,1) The final result: C,2 D,1 is wrong. |
id:111,city_id:A,rider_id:1155,utime:1650980662019 id:222,city_id:A,rider_id:1875,utime:1650980662042 id:333,city_id:B,rider_id:1430,utime:1650980662042 id:111,city_id:A,rider_id:1308,utime:1650980662042 (true,A,1) (false,A,1) (true,A,2) (true,B,1) (false,A,2) (true,A,1) (false,A,1) (true,A,2) id:111,city_id:C,utime:null (false,A,2) (true,A,1) (false,A,1) (true,A,2) (false,A,2) (true,A,1) (true,C,1) id:333,city_id: D,utime:null (false,B,1) (true,D,1) The final result: A,1 C,1 D,1 is right. |
Attachments
Issue Links
- links to