Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.14.4, 1.15.0
-
None
Description
We found an error occurred when retract a staled record when enable state ttl in RetractableTopNFunction, a reproduce case:
@Test public void testRetractAnStaledRecordWithRowNumber() throws Exception { StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000); AbstractTopNFunction func = new RetractableTopNFunction( ttlConfig, InternalTypeInfo.ofFields( VarCharType.STRING_TYPE, new BigIntType(), new IntType()), comparableRecordComparator, sortKeySelector, RankType.ROW_NUMBER, new ConstantRankRange(1, 2), generatedEqualiser, true, true); OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func); testHarness.open(); testHarness.setStateTtlProcessingTime(0); testHarness.processElement(insertRecord("a", 1L, 10)); testHarness.setStateTtlProcessingTime(1001); testHarness.processElement(insertRecord("a", 2L, 11)); testHarness.processElement(deleteRecord("a", 1L, 10)); testHarness.close(); List<Object> expectedOutput = new ArrayList<>(); expectedOutput.add(insertRecord("a", 1L, 10, 1L)); expectedOutput.add(insertRecord("a", 2L, 11, 1L)); // the following delete record should not be sent because the left row is null which is // illegal. // -D{row1=null, row2=+I(1)}; assertorWithRowNumber.assertOutputEquals( "output wrong.", expectedOutput, testHarness.getOutput()); }
the reason is the uncomplete path when deal with staled records.
Attachments
Issue Links
- links to