Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27418

Flink SQL TopN result is wrong

    XMLWordPrintableJSON

Details

    Description

      Flink SQL TopN is executed multiple times with different results, sometimes with correct results and sometimes with incorrect results.

      Example:

      @Test
          public void flinkSqlJoinRetract() {
              EnvironmentSettings settings = EnvironmentSettings.newInstance()
                      .useBlinkPlanner()
                      .inStreamingMode()
                      .build();
      
              StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
              streamEnv.setParallelism(1);
              StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
              tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000));
      
              RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
              RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
              SourceFunction<Row> waybillSourceFunction = buildWaybillStreamSource(waybillTableTypeInfo);
              SourceFunction<Row> itemSourceFunction = buildItemStreamSource(itemTableTypeInfo);
              String waybillTable = "waybill";
              String itemTable = "item";
      
              DataStreamSource<Row> waybillStream = streamEnv.addSource(
                      waybillSourceFunction,
                      waybillTable,
                      waybillTableTypeInfo);
              DataStreamSource<Row> itemStream = streamEnv.addSource(
                      itemSourceFunction,
                      itemTable,
                      itemTableTypeInfo);
      
              Expression[] waybillFields = ExpressionParser
                      .parseExpressionList(String.join(",", waybillTableTypeInfo.getFieldNames())
                              + ",proctime.proctime").toArray(new Expression[0]);
              Expression[] itemFields = ExpressionParser
                      .parseExpressionList(
                              String.join(",", itemTableTypeInfo.getFieldNames()) + ",proctime.proctime")
                      .toArray(new Expression[0]);
      
              tableEnv.createTemporaryView(waybillTable, waybillStream, waybillFields);
              tableEnv.createTemporaryView(itemTable, itemStream, itemFields);
      
              String sql =
                      "select \n"
                      + "    city_id, \n"
                      + "    count(*) as cnt\n"
                      + "from (\n"
                      + "    select id,city_id\n"
                      + "    from (\n"
                      + "        select \n"
                      + "            id,\n"
                      + "            city_id,\n"
                      + "            row_number() over(partition by id order by utime desc ) as rno \n"
                      + "        from (\n"
                      + "            select \n"
                      + "                waybill.id as id,\n"
                      + "                coalesce(item.city_id, waybill.city_id) as city_id,\n"
                      + "                waybill.utime as utime \n"
                      + "            from waybill left join item \n"
                      + "            on waybill.id = item.id \n"
                      + "        ) \n"
                      + "    )\n"
                      + "    where rno =1\n"
                      + ")\n"
                      + "group by city_id";
      
              StatementSet statementSet = tableEnv.createStatementSet();
              Table table = tableEnv.sqlQuery(sql);
              DataStream<Tuple2<Boolean, Row>> rowDataStream = tableEnv.toRetractStream(table, Row.class);
              rowDataStream.printToErr();
              try {
                  streamEnv.execute();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      
          private static RowTypeInfo buildWaybillTableTypeInfo() {
              TypeInformation[] types = new TypeInformation[]{Types.INT(), Types.STRING(), Types.LONG(), Types.LONG()};
              String[] fields = new String[]{"id", "city_id", "rider_id", "utime"};
              return new RowTypeInfo(types, fields);
          }
      
          private static RowTypeInfo buildItemTableTypeInfo() {
              TypeInformation[] types = new TypeInformation[]{Types.INT(), Types.STRING(), Types.LONG()};
              String[] fields = new String[]{"id", "city_id", "utime"};
              return new RowTypeInfo(types, fields);
          }
      
          //id,rider_id,city_id,utime
          private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo rowTypeInfo) {
              return new SourceFunction<Row>() {
                  private volatile boolean stopped = false;
                  int count = 0;
                  int[] ids = {111, 222, 333, 111};
                  String[] cityIds = {"A", "A", "B", "A"};
      
                  @Override
                  public void run(SourceContext<Row> ctx) throws Exception {
                      while (!stopped) {
                          int id = ids[count % ids.length];
                          String cityId = cityIds[count % cityIds.length];
                          Row row = new Row(4);
                          row.setField(0, id);
                          row.setField(1, cityId);
                          row.setField(2, (long) RandomUtils.nextInt(1000, 2000));
                          row.setField(3, System.currentTimeMillis());
                          printRow(rowTypeInfo, row);
                          ctx.collect(row);
                          if (++count > 3) {
                              stopped = true;
                          }
                      }
                  }
      
                  @Override
                  public void cancel() {
                      stopped = true;
                  }
              };
          }
      
          //id,city_id,utime
          private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo rowTypeInfo) {
              return new SourceFunction<Row>() {
                  private volatile boolean stopped = false;
                  int count = 0;
                  int[] ids = {111, 333};
                  String[] cityIds = {"C", "D"};
      
                  @Override
                  public void run(SourceContext<Row> ctx) throws Exception {
                      while (!stopped) {
                          Thread.sleep(RandomUtils.nextInt(1000, 2000));
                          int id = ids[count % ids.length];
                          String cityId = cityIds[count % cityIds.length];
                          Row row = new Row(3);
                          row.setField(0, id);
                          row.setField(1, cityId);
                          //row.setField(2, System.currentTimeMillis());
                          printRow(rowTypeInfo, row);
                          ctx.collect(row);
                          if (++count >= 2) {
                              stopped = true;
                          }
      
                      }
                  }
      
                  @Override
                  public void cancel() {
                      stopped = true;
                  }
              };
          }
      
          public static void printRow(RowTypeInfo rowTypeInfo, Row row) {
              String prefix = "";
              for (int i = 0; i < rowTypeInfo.getArity(); ++i) {
                  prefix = i > 0 ? "," : "";
                  System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + row.getField(i));
              }
              System.out.println();
          }
      
      

      ------------------------------------------------------------

      wrong result right result
      id:111,city_id:A,rider_id:1137,utime:1650979957702
      id:222,city_id:A,rider_id:1976,utime:1650979957725
      id:333,city_id:B,rider_id:1916,utime:1650979957725
      id:111,city_id:A,rider_id:1345,utime:1650979957725
      (true,A,1)
      (false,A,1)
      (true,A,2)
      (true,B,1)
      (false,A,2)
      (true,A,1)
      (false,A,1)
      (true,A,2)
      id:111,city_id:C,utime:null
      (false,A,2)
      (true,A,1)
      (true,C,1)
      (false,A,1)
      (false,C,1)
      (true,C,2)
      id:333,city_id: D,utime:null
      (false,B,1)
      (true,D,1)
      The final result:
      C,2
      D,1
      is wrong.
       
      id:111,city_id:A,rider_id:1155,utime:1650980662019
      id:222,city_id:A,rider_id:1875,utime:1650980662042
      id:333,city_id:B,rider_id:1430,utime:1650980662042
      id:111,city_id:A,rider_id:1308,utime:1650980662042
      (true,A,1)
      (false,A,1)
      (true,A,2)
      (true,B,1)
      (false,A,2)
      (true,A,1)
      (false,A,1)
      (true,A,2)
      id:111,city_id:C,utime:null
      (false,A,2)
      (true,A,1)
      (false,A,1)
      (true,A,2)
      (false,A,2)
      (true,A,1)
      (true,C,1)
      id:333,city_id: D,utime:null
      (false,B,1)
      (true,D,1)
      The final result:
      A,1
      C,1
      D,1
      is right.

       

       

      Attachments

        Issue Links

          Activity

            People

              rovboyko Roman Boyko
              zhangbinzaifendou zhangbin
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: