Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20887

Non-deterministic functions return different values even if it is referred with the same column name

    XMLWordPrintableJSON

Details

    Description

      Add the following test case to CalcITCase.scala

      @Test
      def testRand(): Unit = {
        checkResult(
          s"""
             |SELECT b - a FROM (
             |  SELECT r + 5 AS a, r + 7 AS b FROM (
             |    SELECT RAND() AS r FROM SmallTable3
             |  ) t1
             |) t2
             |""".stripMargin,
          Seq(row(2), row(2), row(2))
        )
      }
      

      Failure messages are

      Results
       == Correct Result - 3 ==   == Actual Result - 3 ==
      !2                          1.051329250417921
      !2                          1.3649146677814379
      !2                          1.787784536771345
              
      Plan:
      == Abstract Syntax Tree ==
      LogicalProject(EXPR$0=[-($1, $0)])
      +- LogicalProject(a=[+($0, 5)], b=[+($0, 7)])
         +- LogicalProject(r=[RAND()])
            +- LogicalTableScan(table=[[default_catalog, default_database, SmallTable3]])
      
      == Optimized Logical Plan ==
      Calc(select=[-(+(RAND(), 7), +(RAND(), 5)) AS EXPR$0])
      +- BoundedStreamScan(table=[[default_catalog, default_database, SmallTable3]], fields=[a, b, c])
      

      It seems that the projections are merged incorrectly. However if you run the following test case in FlinkCalcMergeRuleTest.scala

      @Test
      def testCalcMergeWithRandomUdf(): Unit = {
        val sqlQuery = "SELECT ts + a, ts + b FROM " +
          "(SELECT a, b, random_udf(a) AS ts FROM MyTable WHERE a = b) t"
        util.verifyRelPlan(sqlQuery)
      }
      

      The result is

      <Root>
        <TestCase name="testCalcMergeWithRandomUdf">
          <Resource name="sql">
            <![CDATA[SELECT ts + a, ts + b FROM (SELECT a, b, random_udf(a) AS ts FROM MyTable WHERE a = b) t]]>
          </Resource>
          <Resource name="ast">
            <![CDATA[
      LogicalProject(EXPR$0=[+(random_udf($0), $0)], EXPR$1=[+(random_udf($0), $1)])
      +- LogicalFilter(condition=[=($0, $1)])
         +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
      ]]>
          </Resource>
          <Resource name="optimized rel plan">
            <![CDATA[
      FlinkLogicalCalc(select=[+(random_udf(a), a) AS EXPR$0, +(random_udf(a), b) AS EXPR$1], where=[=(a, b)])
      +- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
      ]]>
          </Resource>
        </TestCase>
      </Root>
      

      It seems that the plan is incorrect from the AST. So this seems to be a bug in Calcite?

      Attachments

        Issue Links

          Activity

            People

              lincoln.86xy lincoln lee
              TsReaper Caizhi Weng
              Votes:
              0 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: