Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35184

Hash collision inside MiniBatchStreamingJoin operator

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      The hash collision is possible for InputSideHasNoUniqueKeyBundle. To reproduce it just launch the following test within StreamingMiniBatchJoinOperatorTest:

       

      @Tag("miniBatchSize=6")
      @Test
      public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) throws Exception {
      
          leftTypeInfo =
                  InternalTypeInfo.of(
                          RowType.of(
                                  new LogicalType[] {new IntType(), new BigIntType()},
                                  new String[] {"id1", "val1"}));
      
          rightTypeInfo =
                  InternalTypeInfo.of(
                          RowType.of(
                                  new LogicalType[] {new IntType(), new BigIntType()},
                                  new String[] {"id2", "val2"}));
      
          leftKeySelector =
                  HandwrittenSelectorUtil.getRowDataSelector(
                          new int[] {0},
                          leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]));
          rightKeySelector =
                  HandwrittenSelectorUtil.getRowDataSelector(
                          new int[] {0},
                          rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]));
      
          joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
      
          super.beforeEach(testInfo);
      
          testHarness.setStateTtlProcessingTime(1);
          testHarness.processElement2(insertRecord(1, 1L));
          testHarness.processElement1(insertRecord(1, 4294967296L));
          testHarness.processElement2(insertRecord(1, 4294967296L));
          testHarness.processElement2(deleteRecord(1, 1L));
      
          testHarness.close();
      
          assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 4294967296L, 1, 4294967296L));
      } 

       

       

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            rovboyko Roman Boyko
            rovboyko Roman Boyko
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment