Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34166

KeyedLookupJoinWrapper incorrectly process delete message for inner join when previous lookup result is empty

    XMLWordPrintableJSON

Details

    Description

      KeyedLookupJoinWrapper(when 'table.optimizer.non-deterministic-update.strategy
      ' is set to 'TRY_RESOLVE' and the lookup join exists NDU problemns) incorrectly process delete message for inner join when previous lookup result is empty

      The intermediate delete result

              expectedOutput.add(deleteRecord(3, "c", null, null));
      

      in current case KeyedLookupJoinHarnessTest#testTemporalInnerJoinWithFilterLookupKeyContainsPk is incorrect:

          @Test
          public void testTemporalInnerJoinWithFilterLookupKeyContainsPk() throws Exception {
              OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
                      createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER, true);
      
              testHarness.open();
      
              testHarness.processElement(insertRecord(1, "a"));
              testHarness.processElement(insertRecord(2, "b"));
              testHarness.processElement(insertRecord(3, "c"));
              testHarness.processElement(insertRecord(4, "d"));
              testHarness.processElement(insertRecord(5, "e"));
              testHarness.processElement(updateBeforeRecord(3, "c"));
              testHarness.processElement(updateAfterRecord(3, "c2"));
              testHarness.processElement(deleteRecord(3, "c2"));
              testHarness.processElement(insertRecord(3, "c3"));
      
              List<Object> expectedOutput = new ArrayList<>();
              expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
              expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
              expectedOutput.add(deleteRecord(3, "c", null, null));
              expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
              expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
              expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
      
              assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
              testHarness.close();
          }
      

      Attachments

        Activity

          People

            lincoln.86xy lincoln lee
            lincoln.86xy lincoln lee
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: