Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34378

Minibatch join disrupted the original order of input records

Details

    • Technical Debt
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.19.0
    • 1.19.0
    • Table SQL / Runtime
    • None

    Description

      I'm not sure if it's a bug. The following case can re-produce this situation.

      // add it in CalcITCase
      @Test
      def test(): Unit = {
        env.setParallelism(1)
        val rows = Seq(
          row(1, "1"),
          row(2, "2"),
          row(3, "3"),
          row(4, "4"),
          row(5, "5"),
          row(6, "6"),
          row(7, "7"),
          row(8, "8"))
        val dataId = TestValuesTableFactory.registerData(rows)
      
        val ddl =
          s"""
             |CREATE TABLE t1 (
             |  a int,
             |  b string
             |) WITH (
             |  'connector' = 'values',
             |  'data-id' = '$dataId',
             |  'bounded' = 'false'
             |)
           """.stripMargin
        tEnv.executeSql(ddl)
      
        val ddl2 =
          s"""
             |CREATE TABLE t2 (
             |  a int,
             |  b string
             |) WITH (
             |  'connector' = 'values',
             |  'data-id' = '$dataId',
             |  'bounded' = 'false'
             |)
           """.stripMargin
        tEnv.executeSql(ddl2)
      
        tEnv.getConfig.getConfiguration
          .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
        tEnv.getConfig.getConfiguration
          .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5))
        tEnv.getConfig.getConfiguration
          .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
      
        println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
      
        tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
      }

      Result

      +----+---+---+---+---+ 
      | op | a | b | a0| b0| 
      +----+---+---+---+---+ 
      | +I | 3 | 3 | 3 | 3 | 
      | +I | 7 | 7 | 7 | 7 | 
      | +I | 2 | 2 | 2 | 2 | 
      | +I | 5 | 5 | 5 | 5 | 
      | +I | 1 | 1 | 1 | 1 | 
      | +I | 6 | 6 | 6 | 6 | 
      | +I | 4 | 4 | 4 | 4 | 
      | +I | 8 | 8 | 8 | 8 | 
      +----+---+---+---+---+
      
      

      When I do not use minibatch join, the result is :

      +----+---+---+----+----+
      | op | a | b | a0 | b0 |
      +----+---+---+----+----+
      | +I | 1 | 1 |  1 |  1 |
      | +I | 2 | 2 |  2 |  2 |
      | +I | 3 | 3 |  3 |  3 |
      | +I | 4 | 4 |  4 |  4 |
      | +I | 5 | 5 |  5 |  5 |
      | +I | 6 | 6 |  6 |  6 |
      | +I | 7 | 7 |  7 |  7 |
      | +I | 8 | 8 |  8 |  8 |
      +----+---+---+----+----+
       

       

      Attachments

        Issue Links

          Activity

            xuyangzhong xuyang added a comment -

            cc xu_shuai_ 

            xuyangzhong xuyang added a comment - cc xu_shuai_  
            lsy dalongliu added a comment - - edited

            This is a by-design behavior, we only guarantee consistency in the final result.

            lsy dalongliu added a comment - - edited This is a by-design behavior, we only guarantee consistency in the final result.
            libenchao Benchao Li added a comment -

            I'm wondering what kind of disorder it is, from the Jira description I cannot get it directly. Could you paste the output in the description, then others can know what the problem is without running the test themselves.

            libenchao Benchao Li added a comment - I'm wondering what kind of disorder it is, from the Jira description I cannot get it directly. Could you paste the output in the description, then others can know what the problem is without running the test themselves.
            xuyangzhong xuyang added a comment -

            lsy The situation is that although I set the parallelism is "1", but the order of output in minibatch is still disrupted.

            Hi, libenchao . Thanks for reminding, I have attached the diff about results while tuning on and off the minibatch join.

            xuyangzhong xuyang added a comment - lsy The situation is that although I set the parallelism is "1", but the order of output in minibatch is still disrupted. Hi, libenchao . Thanks for reminding, I have attached the diff about results while tuning on and off the minibatch join.
            jeyhunkarimov Jeyhun Karimov added a comment -

            Hi xuyangzhong the ordering is different even with parallelism 1 because of Set in MiniBatch operator. IMO this is expected behavior.

            jeyhunkarimov Jeyhun Karimov added a comment - Hi xuyangzhong the ordering is different even with parallelism 1 because of Set in MiniBatch operator. IMO this is expected behavior.
            xu_shuai_ Shuai Xu added a comment -

            Hi xuyangzhong . This is an expected behavior. To maintain order, additional data structures would need to be introduced, which would result in a performance degradation and the ordered effect would only materialize when parallelism is set to 1. If order preservation is required with a parallelism of 1, it suffices to simply turn off the minibatch feature.

            xu_shuai_ Shuai Xu added a comment - Hi xuyangzhong . This is an expected behavior. To maintain order, additional data structures would need to be introduced, which would result in a performance degradation and the ordered effect would only materialize when parallelism is set to 1. If order preservation is required with a parallelism of 1, it suffices to simply turn off the minibatch feature.
            xuyangzhong xuyang added a comment -

            Thanks for your answer, xu_shuai_. That sounds good to me. I'll close this jira.

            xuyangzhong xuyang added a comment - Thanks for your answer, xu_shuai_ . That sounds good to me. I'll close this jira.

            People

              Unassigned Unassigned
              xuyangzhong xuyang
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: