Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34380

Strange RowKind and records about intermediate output when using minibatch join

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.19.0
    • 2.0.0
    • Table SQL / Runtime
    • None

    Description

      // Add it in CalcItCase
      
      @Test
        def test(): Unit = {
          env.setParallelism(1)
          val rows = Seq(
            changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
            changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
            changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
            changelogRow("-D", java.lang.Integer.valueOf(1), "99")
          )
          val dataId = TestValuesTableFactory.registerData(rows)
      
          val ddl =
            s"""
               |CREATE TABLE t1 (
               |  a int,
               |  b string
               |) WITH (
               |  'connector' = 'values',
               |  'data-id' = '$dataId',
               |  'bounded' = 'false'
               |)
             """.stripMargin
          tEnv.executeSql(ddl)
      
          val ddl2 =
            s"""
               |CREATE TABLE t2 (
               |  a int,
               |  b string
               |) WITH (
               |  'connector' = 'values',
               |  'data-id' = '$dataId',
               |  'bounded' = 'false'
               |)
             """.stripMargin
          tEnv.executeSql(ddl2)
      
          tEnv.getConfig.getConfiguration
            .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
          tEnv.getConfig.getConfiguration
            .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5))
          tEnv.getConfig.getConfiguration
            .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
      
          println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
      
          tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
        } 

      Output:

      +----+-------------+-----------------+-------------+---------+
      | op |           a |               b |          a0 |      b0 |
      +----+-------------+-----------------+-------------+---------+
      | +U |           1 |               1 |           1 |      99 |
      | +U |           1 |              99 |           1 |      99 |
      | -U |           1 |               1 |           1 |      99 |
      | -D |           1 |              99 |           1 |      99 |
      +----+-------------+-----------------+-------------+---------+

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              xuyangzhong xuyang
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: