Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28019

Error in RetractableTopNFunction when retracting a stale record with state ttl enabled

    XMLWordPrintableJSON

Details

    Description

      We found an error occurred when retract a staled record when enable state ttl in RetractableTopNFunction, a reproduce case:

          @Test
          public void testRetractAnStaledRecordWithRowNumber() throws Exception {
              StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000);
              AbstractTopNFunction func =
                      new RetractableTopNFunction(
                              ttlConfig,
                              InternalTypeInfo.ofFields(
                                      VarCharType.STRING_TYPE, new BigIntType(), new IntType()),
                              comparableRecordComparator,
                              sortKeySelector,
                              RankType.ROW_NUMBER,
                              new ConstantRankRange(1, 2),
                              generatedEqualiser,
                              true,
                              true);
      
              OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
              testHarness.open();
              testHarness.setStateTtlProcessingTime(0);
              testHarness.processElement(insertRecord("a", 1L, 10));
              testHarness.setStateTtlProcessingTime(1001);
              testHarness.processElement(insertRecord("a", 2L, 11));
              testHarness.processElement(deleteRecord("a", 1L, 10));
              testHarness.close();
      
              List<Object> expectedOutput = new ArrayList<>();
              expectedOutput.add(insertRecord("a", 1L, 10, 1L));
              expectedOutput.add(insertRecord("a", 2L, 11, 1L));
              // the following delete record should not be sent because the left row is null which is
              // illegal.
              // -D{row1=null, row2=+I(1)};
      
              assertorWithRowNumber.assertOutputEquals(
                      "output wrong.", expectedOutput, testHarness.getOutput());
          }
      

      the reason is the uncomplete path when deal with staled records.

      Attachments

        Issue Links

          Activity

            People

              lincoln.86xy lincoln lee
              lincoln.86xy lincoln lee
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: